Open In Colab

Lesson 8 Case Studies

Pragmatic AI Labs

alt text

This notebook was produced by Pragmatic AI Labs. You can continue learning about these topics by:

Lesson 8: Case Studies (60 min)

  • 8.1 Understand Big Data for Sagemaker (12 min)
  • 8.2 Learn Sagemaker and EMR Integration (12 min)
  • 8.3 Learn Serverless Production Big Data Application Development (12 min)
  • 8.4 Implement Containerization for Big Data (12 min)
  • 8.5 Implement Spot Instances for Big Data Pipeline (12 min)

8.1 Understand Big Data for Sagemaker

Manage Machine Learning Experiments with Search

  • Finding training jobs
  • Rank training jobs
  • Tracing lineage of a model

Ground Truth


  • Setup and Manage labeling jobs
  • Uses active learning and human labeling
  • First 500 objects labeled per month are free

[Demo] Labeling Job



[Demo] Sagemaker Notebooks

  • Create and run Jupyter Notebooks
  • Using Jupyter
  • Using JupyterLab
  • Using the terminal

  • Lifecycle configurations

  • Git Repositories
  • public repositories can be cloned on Notebook launch



[Demo] Sagemaker Training

  • Algorithms
  • Create algorithm
  • Subscribe AWS Marketplace

  • Training Jobs

  • HyperParameter Tuning Jobs



[Demo] Sagemaker Inference

  • Compilation jobs

  • Model packages

  • Models

  • Endpoint configurations

  • Endpoints

  • Batch transform jobs

Built in Sagemaker Algorithms

Table of algorithms provided by Amazon Sagemaker


Sagemaker Built-in Algorithms—Examples


  • unsupervised learning algorithm for generating Word2Vec embeddings.
  • aws blog post BlazingText


DeepAR Forecasting

  • supervised learning algorithm for forecasting scalar (that is, one-dimensional) time series using recurrent neural networks (RNN)
  • DeepAR Documentation




  • **Built in Sagemaker Algorithms Scale: **

“We recommend training k-means on CPU instances. You can train on GPU instances, but should limit GPU training to p*.xlarge instances because only one GPU per instance is used.””

  • County Census Notebook

8.2 Learn Sagemaker and EMR Integration




8.3 Learn Serverless Production Big Data Application Development

Source Code for Demo


Creating Timed Lambdas

Creating Serverless Data Pipeline Producers

Using AWS Lambda with Cloudwatch Events

Can create cloudwatch timer to call lambda

cloudwatch event lambda

Using AWS Cloudwatch logging with AWS Lambda

Using cloudwatch logging is an essential step for Lambda Development


Using AWS Lambda to populate AWS SQS (Simple Queuing Service)

  1. * Create new Lambda with Serverless Wizard*
  2. cd into lambda and install packages on level up
pip3 install boto3 --target ../
pip3 install python-json-logger --target ../
  1. Test local
  2. * Deploy*
Dynamo to SQS

import boto3
import json
import sys
import os

DYNAMODB = boto3.resource('dynamodb')
TABLE = "fang"
QUEUE = "producer"
SQS = boto3.client("sqs")

import logging
from pythonjsonlogger import jsonlogger

LOG = logging.getLogger()
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()

def scan_table(table):
    """Scans table and return results""""Scanning Table {table}")
    producer_table = DYNAMODB.Table(table)
    response = producer_table.scan()
    items = response['Items']"Found {len(items)} Items")
    return items

def send_sqs_msg(msg, queue_name, delay=0):
    """Send SQS Message

    Expects an SQS queue_name and msg in a dictionary format.
    Returns a response dictionary. 

    queue_url = SQS.get_queue_url(QueueName=queue_name)["QueueUrl"]
    queue_send_log_msg = "Send message to queue url: %s, with body: %s" %\
        (queue_url, msg)
    json_msg = json.dumps(msg)
    response = SQS.send_message(
    queue_send_log_msg_resp = "Message Response: %s for queue url: %s" %\
        (response, queue_url)
    return response

def send_emissions(table, queue_name):
    """Send Emissions"""
    items = scan_table(table=table)
    for item in items:"Sending item {item} to queue: {queue_name}")
        response = send_sqs_msg(item, queue_name=queue_name)

def lambda_handler(event, context):
    Lambda entrypoint

    extra_logging = {"table": TABLE, "queue": QUEUE}"event {event}, context {context}", extra=extra_logging)
    send_emissions(table=TABLE, queue_name=QUEUE)

Successful Local Test

test local

Verify Messages in SQS


Remote Test Needs Correct Role!!!

role failure

Wire up Cloudwatch Event Trigger

  1. Enable Timed Execution of producer
  2. Verify messages flowing into SQS

cloudwatch event trigger

SQS is populating

alt text

Creating Event Driven Lambdas

Triggering AWS Lambda with AWS SQS Events

Lambda can now fire on SQS event

SQS Trigger

Reading AWS SQS Events from AWS Lambda

def lambda_handler(event, context):
    """Entry Point for Lambda""""SURVEYJOB LAMBDA, event {event}, context {context}")
    receipt_handle  = event['Records'][0]['receiptHandle'] #sqs message
    #'eventSourceARN': 'arn:aws:sqs:us-east-1:561744971673:producer'
    event_source_arn = event['Records'][0]['eventSourceARN']
    names = [] #Captured from Queue
    # Process Queue
    for record in event['Records']:
        body = json.loads(record['body'])
        company_name = body['name']
        #Capture for processing
        extra_logging = {"body": body, "company_name":company_name}"SQS CONSUMER LAMBDA, splitting sqs arn with value: {event_source_arn}",extra=extra_logging)
        qname = event_source_arn.split(":")[-1]
        extra_logging["queue"] = qname"Attemping Deleting SQS receiptHandle {receipt_handle} with queue_name {qname}", extra=extra_logging)
        res = delete_sqs_msg(queue_name=qname, receipt_handle=receipt_handle)"Deleted SQS receipt_handle {receipt_handle} with res {res}", extra=extra_logging)
    # Make Pandas dataframe with wikipedia snippts"Creating dataframe with values: {names}")
    df = names_to_wikipedia(names)
    # Perform Sentiment Analysis
    df = apply_sentiment(df)"Sentiment from FANG companies: {df.to_dict()}")
    # Write result to S3
    write_s3(df=df, name=names.pop(), bucket="fangsentiment")

Writing results to AWS S3

write dataframe to AWS S3

### S3
def write_s3(df, name, bucket):
    """Write S3 Bucket"""

    csv_buffer = StringIO()
    s3_resource = boto3.resource('s3')
    res = s3_resource.Object(bucket, f'{name}_sentiment.csv').\
        put(Body=csv_buffer.getvalue())"result of write name: {name} to bucket: {bucket} with:\n {res}")

noah:/tmp $ aws s3 cp --recursive s3://fangsentiment/ .                                                                                                
download: s3://fangsentiment/netflix_sentiment.csv to ./netflix_sentiment.csv
download: s3://fangsentiment/google_sentiment.csv to ./google_sentiment.csv
download: s3://fangsentiment/facebook_sentiment.csv to ./facebook_sentiment.csv

8.4 Implement Containerization for Big Data


8.5 Implement Spot Instances for Big Data Pipeline

Real Massively Parallel Computer Vision Pipeline

Spot Pipeline

Spot Launcher

#!/usr/bin/env python
"""Launches a test spot instance"""

import click
import boto3
import base64

from sensible.loginit import logger
log = logger(__name__)

#Tell Boto3 To Enable Debug Logging
def cli():
    """Spot Launcher"""

def user_data_cmds(duration):
    """Initial cmds to run, takes duration for halt cmd"""

    cmds = """
         - echo "halt" | at now + {duration} min
    return cmds

@click.option('--instance', default="r4.large", help='Instance Type')
@click.option('--duration', default="55", help='Duration')
@click.option('--keyname', default="pragai", help='Key Name')
@click.option('--profile', default="arn:aws:iam::561744971673:instance-profile/admin",
@click.option('--securitygroup', default="sg-61706e07", help='Key Name')
@click.option('--ami', default="ami-6df1e514", help='Key Name')
def request_spot_instance(duration, instance, keyname, 
                            profile, securitygroup, ami):
    """Request spot instance"""

    #import pdb;pdb.set_trace()
    user_data = user_data_cmds(duration)
    LaunchSpecifications = {
            "ImageId": ami,
            "InstanceType": instance,
            "KeyName": keyname,
            "IamInstanceProfile": {
                "Arn": profile
            "UserData": base64.b64encode(user_data.encode("ascii")).\
            "BlockDeviceMappings": [
                    "DeviceName": "/dev/xvda",
                    "Ebs": {
                        "DeleteOnTermination": True,
                        "VolumeType": "gp2",
                        "VolumeSize": 8,
            "SecurityGroupIds": [securitygroup]

    run_args = {
            'SpotPrice'           : "0.8",
            'Type'                : "one-time",
            'InstanceCount'       : 1,
            'LaunchSpecification' : LaunchSpecifications

    msg_user_data = "SPOT REQUEST DATA: %s" % run_args

    client = boto3.client('ec2', "us-west-2")
    reservation = client.request_spot_instances(**run_args)
    return reservation

if __name__ == '__main__':


  • Spot Launch Demo and Walkthrough on Pricing
  • Spot Instances EMR
  • Spot Instances AWS Batch