Open In Colab

Lesson 8 Case Studies

Pragmatic AI Labs

alt text

This notebook was produced by Pragmatic AI Labs. You can continue learning about these topics by:

Lesson 8: Case Studies (60 min)

  • 8.1 Understand Big Data for Sagemaker (12 min)
  • 8.2 Learn Sagemaker and EMR Integration (12 min)
  • 8.3 Learn Serverless Production Big Data Application Development (12 min)
  • 8.4 Implement Containerization for Big Data (12 min)
  • 8.5 Implement Spot Instances for Big Data Pipeline (12 min)

8.1 Understand Big Data for Sagemaker

Manage Machine Learning Experiments with Search

  • Finding training jobs
  • Rank training jobs
  • Tracing lineage of a model

Ground Truth

ground_truth

  • Setup and Manage labeling jobs
  • Uses active learning and human labeling
  • First 500 objects labeled per month are free

[Demo] Labeling Job

Notebook

notebooks

[Demo] Sagemaker Notebooks

  • Create and run Jupyter Notebooks
  • Using Jupyter
  • Using JupyterLab
  • Using the terminal

  • Lifecycle configurations

  • Git Repositories
  • public repositories can be cloned on Notebook launch

Training

training

[Demo] Sagemaker Training

  • Algorithms
  • Create algorithm
  • Subscribe AWS Marketplace

  • Training Jobs

  • HyperParameter Tuning Jobs

Inference

inference

[Demo] Sagemaker Inference

  • Compilation jobs

  • Model packages

  • Models

  • Endpoint configurations

  • Endpoints

  • Batch transform jobs

Built in Sagemaker Algorithms

Table of algorithms provided by Amazon Sagemaker

aws_algorithms

Sagemaker Built-in Algorithms—Examples

BlazingText

  • unsupervised learning algorithm for generating Word2Vec embeddings.
  • aws blog post BlazingText

BlazingText

DeepAR Forecasting

  • supervised learning algorithm for forecasting scalar (that is, one-dimensional) time series using recurrent neural networks (RNN)
  • DeepAR Documentation

DeepAR

Demo

[Demo]

  • **Built in Sagemaker Algorithms Scale: **

“We recommend training k-means on CPU instances. You can train on GPU instances, but should limit GPU training to p*.xlarge instances because only one GPU per instance is used.””

  • County Census Notebook

8.2 Learn Sagemaker and EMR Integration

kernel

Demo

EMR

8.3 Learn Serverless Production Big Data Application Development

Source Code for Demo

[Demo]

Creating Timed Lambdas

Creating Serverless Data Pipeline Producers

Using AWS Lambda with Cloudwatch Events

Can create cloudwatch timer to call lambda

cloudwatch event lambda

Using AWS Cloudwatch logging with AWS Lambda

Using cloudwatch logging is an essential step for Lambda Development

cloudwatch

Using AWS Lambda to populate AWS SQS (Simple Queuing Service)

  1. * Create new Lambda with Serverless Wizard*
  2. cd into lambda and install packages on level up
pip3 install boto3 --target ../
pip3 install python-json-logger --target ../
Copy to clipboard
  1. Test local
  2. * Deploy*
"""
Dynamo to SQS
"""

import boto3
import json
import sys
import os

DYNAMODB = boto3.resource('dynamodb')
TABLE = "fang"
QUEUE = "producer"
SQS = boto3.client("sqs")

#SETUP LOGGING
import logging
from pythonjsonlogger import jsonlogger

LOG = logging.getLogger()
LOG.setLevel(logging.INFO)
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
LOG.addHandler(logHandler)

def scan_table(table):
    """Scans table and return results"""
    
    LOG.info(f"Scanning Table {table}")
    producer_table = DYNAMODB.Table(table)
    response = producer_table.scan()
    items = response['Items']
    LOG.info(f"Found {len(items)} Items")
    return items

def send_sqs_msg(msg, queue_name, delay=0):
    """Send SQS Message

    Expects an SQS queue_name and msg in a dictionary format.
    Returns a response dictionary. 
    """

    queue_url = SQS.get_queue_url(QueueName=queue_name)["QueueUrl"]
    queue_send_log_msg = "Send message to queue url: %s, with body: %s" %\
        (queue_url, msg)
    LOG.info(queue_send_log_msg)
    json_msg = json.dumps(msg)
    response = SQS.send_message(
        QueueUrl=queue_url,
        MessageBody=json_msg,
        DelaySeconds=delay)
    queue_send_log_msg_resp = "Message Response: %s for queue url: %s" %\
        (response, queue_url) 
    LOG.info(queue_send_log_msg_resp)
    return response

def send_emissions(table, queue_name):
    """Send Emissions"""
    
    items = scan_table(table=table)
    for item in items:
        LOG.info(f"Sending item {item} to queue: {queue_name}")
        response = send_sqs_msg(item, queue_name=queue_name)
        LOG.debug(response)

def lambda_handler(event, context):
    """
    Lambda entrypoint
    """

    extra_logging = {"table": TABLE, "queue": QUEUE}
    LOG.info(f"event {event}, context {context}", extra=extra_logging)
    send_emissions(table=TABLE, queue_name=QUEUE)

Copy to clipboard

Successful Local Test

test local

Verify Messages in SQS

**SQS**

Remote Test Needs Correct Role!!!

role failure

Wire up Cloudwatch Event Trigger

  1. Enable Timed Execution of producer
  2. Verify messages flowing into SQS

cloudwatch event trigger

SQS is populating

alt text

Creating Event Driven Lambdas

Triggering AWS Lambda with AWS SQS Events

Lambda can now fire on SQS event

SQS Trigger

Reading AWS SQS Events from AWS Lambda

def lambda_handler(event, context):
    """Entry Point for Lambda"""

    LOG.info(f"SURVEYJOB LAMBDA, event {event}, context {context}")
    receipt_handle  = event['Records'][0]['receiptHandle'] #sqs message
    #'eventSourceARN': 'arn:aws:sqs:us-east-1:561744971673:producer'
    event_source_arn = event['Records'][0]['eventSourceARN']
    
    names = [] #Captured from Queue
    
    # Process Queue
    for record in event['Records']:
        body = json.loads(record['body'])
        company_name = body['name']
        
        #Capture for processing
        names.append(company_name)
        
        extra_logging = {"body": body, "company_name":company_name}
        LOG.info(f"SQS CONSUMER LAMBDA, splitting sqs arn with value: {event_source_arn}",extra=extra_logging)
        qname = event_source_arn.split(":")[-1]
        extra_logging["queue"] = qname
        LOG.info(f"Attemping Deleting SQS receiptHandle {receipt_handle} with queue_name {qname}", extra=extra_logging)
        res = delete_sqs_msg(queue_name=qname, receipt_handle=receipt_handle)
        LOG.info(f"Deleted SQS receipt_handle {receipt_handle} with res {res}", extra=extra_logging)
    
    # Make Pandas dataframe with wikipedia snippts
    LOG.info(f"Creating dataframe with values: {names}")
    df = names_to_wikipedia(names)
    
    # Perform Sentiment Analysis
    df = apply_sentiment(df)
    LOG.info(f"Sentiment from FANG companies: {df.to_dict()}")
    
    # Write result to S3
    write_s3(df=df, name=names.pop(), bucket="fangsentiment")
Copy to clipboard

Writing results to AWS S3

write dataframe to AWS S3

### S3
def write_s3(df, name, bucket):
    """Write S3 Bucket"""

    csv_buffer = StringIO()
    df.to_csv(csv_buffer)
    s3_resource = boto3.resource('s3')
    res = s3_resource.Object(bucket, f'{name}_sentiment.csv').\
        put(Body=csv_buffer.getvalue())
    LOG.info(f"result of write name: {name} to bucket: {bucket} with:\n {res}")

Copy to clipboard
noah:/tmp $ aws s3 cp --recursive s3://fangsentiment/ .                                                                                                
download: s3://fangsentiment/netflix_sentiment.csv to ./netflix_sentiment.csv
download: s3://fangsentiment/google_sentiment.csv to ./google_sentiment.csv
download: s3://fangsentiment/facebook_sentiment.csv to ./facebook_sentiment.csv
Copy to clipboard

8.4 Implement Containerization for Big Data

[Demo]

8.5 Implement Spot Instances for Big Data Pipeline

Real Massively Parallel Computer Vision Pipeline

Spot Pipeline

Spot Launcher

#!/usr/bin/env python
"""Launches a test spot instance"""

import click
import boto3
import base64

from sensible.loginit import logger
log = logger(__name__)

#Tell Boto3 To Enable Debug Logging
#boto3.set_stream_logger(name='botocore')

@click.group()
def cli():
    """Spot Launcher"""


def user_data_cmds(duration):
    """Initial cmds to run, takes duration for halt cmd"""

    cmds = """
        #cloud-config
        runcmd:
         - echo "halt" | at now + {duration} min
    """.format(duration=duration)
    return cmds

@cli.command("launch")
@click.option('--instance', default="r4.large", help='Instance Type')
@click.option('--duration', default="55", help='Duration')
@click.option('--keyname', default="pragai", help='Key Name')
@click.option('--profile', default="arn:aws:iam::561744971673:instance-profile/admin",
                     help='IamInstanceProfile')
@click.option('--securitygroup', default="sg-61706e07", help='Key Name')
@click.option('--ami', default="ami-6df1e514", help='Key Name')
def request_spot_instance(duration, instance, keyname, 
                            profile, securitygroup, ami):
    """Request spot instance"""

    #import pdb;pdb.set_trace()
    user_data = user_data_cmds(duration)
    LaunchSpecifications = {
            "ImageId": ami,
            "InstanceType": instance,
            "KeyName": keyname,
            "IamInstanceProfile": {
                "Arn": profile
            },
            "UserData": base64.b64encode(user_data.encode("ascii")).\
                decode('ascii'),
            "BlockDeviceMappings": [
                {
                    "DeviceName": "/dev/xvda",
                    "Ebs": {
                        "DeleteOnTermination": True,
                        "VolumeType": "gp2",
                        "VolumeSize": 8,
                    }
                }
            ],
            "SecurityGroupIds": [securitygroup]
        }

    run_args = {
            'SpotPrice'           : "0.8",
            'Type'                : "one-time",
            'InstanceCount'       : 1,
            'LaunchSpecification' : LaunchSpecifications
        }

    msg_user_data = "SPOT REQUEST DATA: %s" % run_args
    log.info(msg_user_data)

    client = boto3.client('ec2', "us-west-2")
    reservation = client.request_spot_instances(**run_args)
    return reservation

if __name__ == '__main__':
    cli()

Copy to clipboard

Demo

  • Spot Launch Demo and Walkthrough on Pricing
  • Spot Instances EMR
  • Spot Instances AWS Batch