Lesson 8 Case Studies
Pragmatic AI Labs
This notebook was produced by Pragmatic AI Labs. You can continue learning about these topics by:
- Buying a copy of Pragmatic AI: An Introduction to Cloud-Based Machine Learning
- Reading an online copy of Pragmatic AI:Pragmatic AI: An Introduction to Cloud-Based Machine Learning
- Watching video Essential Machine Learning and AI with Python and Jupyter Notebook-Video-SafariOnline on Safari Books Online.
- Watching video AWS Certified Machine Learning-Speciality
- Purchasing video Essential Machine Learning and AI with Python and Jupyter Notebook- Purchase Video
- Purchasing video AWS Certified Machine Learning Video and Practice Exams
- Viewing more content at noahgift.com
Lesson 8: Case Studies (60 min)
- 8.1 Understand Big Data for Sagemaker (12 min)
- 8.2 Learn Sagemaker and EMR Integration (12 min)
- 8.3 Learn Serverless Production Big Data Application Development (12 min)
- 8.4 Implement Containerization for Big Data (12 min)
- 8.5 Implement Spot Instances for Big Data Pipeline (12 min)
8.1 Understand Big Data for Sagemaker
Search
[Demo] Search
Manage Machine Learning Experiments with Search
- Finding training jobs
- Rank training jobs
- Tracing lineage of a model
Ground Truth
- Setup and Manage labeling jobs
- Uses active learning and human labeling
- First 500 objects labeled per month are free
[Demo] Labeling Job
Notebook
[Demo] Sagemaker Notebooks
- Create and run Jupyter Notebooks
- Using Jupyter
- Using JupyterLab
-
Using the terminal
-
Lifecycle configurations
- Git Repositories
- public repositories can be cloned on Notebook launch
Training
[Demo] Sagemaker Training
- Algorithms
- Create algorithm
-
Subscribe AWS Marketplace
-
Training Jobs
- HyperParameter Tuning Jobs
Inference
[Demo] Sagemaker Inference
-
Compilation jobs
-
Model packages
-
Models
-
Endpoint configurations
-
Endpoints
-
Batch transform jobs
Built in Sagemaker Algorithms
Table of algorithms provided by Amazon Sagemaker
Sagemaker Built-in Algorithms—Examples
BlazingText
- unsupervised learning algorithm for generating Word2Vec embeddings.
- aws blog post BlazingText
DeepAR Forecasting
- supervised learning algorithm for forecasting scalar (that is, one-dimensional) time series using recurrent neural networks (RNN)
- DeepAR Documentation
Demo
[Demo]
- **Built in Sagemaker Algorithms Scale: **
“We recommend training k-means on CPU instances. You can train on GPU instances, but should limit GPU training to p*.xlarge instances because only one GPU per instance is used.””
- County Census Notebook
8.2 Learn Sagemaker and EMR Integration
Demo
8.3 Learn Serverless Production Big Data Application Development
[Demo]
Creating Timed Lambdas
Creating Serverless Data Pipeline Producers
Using AWS Lambda with Cloudwatch Events
Can create cloudwatch timer to call lambda
Using AWS Cloudwatch logging with AWS Lambda
Using cloudwatch logging is an essential step for Lambda Development
Using AWS Lambda to populate AWS SQS (Simple Queuing Service)
- * Create new Lambda with Serverless Wizard*
- cd into lambda and install packages on level up
pip3 install boto3 --target ../
pip3 install python-json-logger --target ../
- Test local
- * Deploy*
"""
Dynamo to SQS
"""
import boto3
import json
import sys
import os
DYNAMODB = boto3.resource('dynamodb')
TABLE = "fang"
QUEUE = "producer"
SQS = boto3.client("sqs")
#SETUP LOGGING
import logging
from pythonjsonlogger import jsonlogger
LOG = logging.getLogger()
LOG.setLevel(logging.INFO)
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
LOG.addHandler(logHandler)
def scan_table(table):
"""Scans table and return results"""
LOG.info(f"Scanning Table {table}")
producer_table = DYNAMODB.Table(table)
response = producer_table.scan()
items = response['Items']
LOG.info(f"Found {len(items)} Items")
return items
def send_sqs_msg(msg, queue_name, delay=0):
"""Send SQS Message
Expects an SQS queue_name and msg in a dictionary format.
Returns a response dictionary.
"""
queue_url = SQS.get_queue_url(QueueName=queue_name)["QueueUrl"]
queue_send_log_msg = "Send message to queue url: %s, with body: %s" %\
(queue_url, msg)
LOG.info(queue_send_log_msg)
json_msg = json.dumps(msg)
response = SQS.send_message(
QueueUrl=queue_url,
MessageBody=json_msg,
DelaySeconds=delay)
queue_send_log_msg_resp = "Message Response: %s for queue url: %s" %\
(response, queue_url)
LOG.info(queue_send_log_msg_resp)
return response
def send_emissions(table, queue_name):
"""Send Emissions"""
items = scan_table(table=table)
for item in items:
LOG.info(f"Sending item {item} to queue: {queue_name}")
response = send_sqs_msg(item, queue_name=queue_name)
LOG.debug(response)
def lambda_handler(event, context):
"""
Lambda entrypoint
"""
extra_logging = {"table": TABLE, "queue": QUEUE}
LOG.info(f"event {event}, context {context}", extra=extra_logging)
send_emissions(table=TABLE, queue_name=QUEUE)
Successful Local Test
Verify Messages in SQS
Remote Test Needs Correct Role!!!
Wire up Cloudwatch Event Trigger
- Enable Timed Execution of producer
- Verify messages flowing into SQS
SQS is populating
Creating Event Driven Lambdas
Triggering AWS Lambda with AWS SQS Events
Lambda can now fire on SQS event
Reading AWS SQS Events from AWS Lambda
def lambda_handler(event, context):
"""Entry Point for Lambda"""
LOG.info(f"SURVEYJOB LAMBDA, event {event}, context {context}")
receipt_handle = event['Records'][0]['receiptHandle'] #sqs message
#'eventSourceARN': 'arn:aws:sqs:us-east-1:561744971673:producer'
event_source_arn = event['Records'][0]['eventSourceARN']
names = [] #Captured from Queue
# Process Queue
for record in event['Records']:
body = json.loads(record['body'])
company_name = body['name']
#Capture for processing
names.append(company_name)
extra_logging = {"body": body, "company_name":company_name}
LOG.info(f"SQS CONSUMER LAMBDA, splitting sqs arn with value: {event_source_arn}",extra=extra_logging)
qname = event_source_arn.split(":")[-1]
extra_logging["queue"] = qname
LOG.info(f"Attemping Deleting SQS receiptHandle {receipt_handle} with queue_name {qname}", extra=extra_logging)
res = delete_sqs_msg(queue_name=qname, receipt_handle=receipt_handle)
LOG.info(f"Deleted SQS receipt_handle {receipt_handle} with res {res}", extra=extra_logging)
# Make Pandas dataframe with wikipedia snippts
LOG.info(f"Creating dataframe with values: {names}")
df = names_to_wikipedia(names)
# Perform Sentiment Analysis
df = apply_sentiment(df)
LOG.info(f"Sentiment from FANG companies: {df.to_dict()}")
# Write result to S3
write_s3(df=df, name=names.pop(), bucket="fangsentiment")
Writing results to AWS S3
write dataframe to AWS S3
### S3
def write_s3(df, name, bucket):
"""Write S3 Bucket"""
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
res = s3_resource.Object(bucket, f'{name}_sentiment.csv').\
put(Body=csv_buffer.getvalue())
LOG.info(f"result of write name: {name} to bucket: {bucket} with:\n {res}")
noah:/tmp $ aws s3 cp --recursive s3://fangsentiment/ .
download: s3://fangsentiment/netflix_sentiment.csv to ./netflix_sentiment.csv
download: s3://fangsentiment/google_sentiment.csv to ./google_sentiment.csv
download: s3://fangsentiment/facebook_sentiment.csv to ./facebook_sentiment.csv
8.4 Implement Containerization for Big Data
[Demo]
8.5 Implement Spot Instances for Big Data Pipeline
Real Massively Parallel Computer Vision Pipeline
#!/usr/bin/env python
"""Launches a test spot instance"""
import click
import boto3
import base64
from sensible.loginit import logger
log = logger(__name__)
#Tell Boto3 To Enable Debug Logging
#boto3.set_stream_logger(name='botocore')
@click.group()
def cli():
"""Spot Launcher"""
def user_data_cmds(duration):
"""Initial cmds to run, takes duration for halt cmd"""
cmds = """
#cloud-config
runcmd:
- echo "halt" | at now + {duration} min
""".format(duration=duration)
return cmds
@cli.command("launch")
@click.option('--instance', default="r4.large", help='Instance Type')
@click.option('--duration', default="55", help='Duration')
@click.option('--keyname', default="pragai", help='Key Name')
@click.option('--profile', default="arn:aws:iam::561744971673:instance-profile/admin",
help='IamInstanceProfile')
@click.option('--securitygroup', default="sg-61706e07", help='Key Name')
@click.option('--ami', default="ami-6df1e514", help='Key Name')
def request_spot_instance(duration, instance, keyname,
profile, securitygroup, ami):
"""Request spot instance"""
#import pdb;pdb.set_trace()
user_data = user_data_cmds(duration)
LaunchSpecifications = {
"ImageId": ami,
"InstanceType": instance,
"KeyName": keyname,
"IamInstanceProfile": {
"Arn": profile
},
"UserData": base64.b64encode(user_data.encode("ascii")).\
decode('ascii'),
"BlockDeviceMappings": [
{
"DeviceName": "/dev/xvda",
"Ebs": {
"DeleteOnTermination": True,
"VolumeType": "gp2",
"VolumeSize": 8,
}
}
],
"SecurityGroupIds": [securitygroup]
}
run_args = {
'SpotPrice' : "0.8",
'Type' : "one-time",
'InstanceCount' : 1,
'LaunchSpecification' : LaunchSpecifications
}
msg_user_data = "SPOT REQUEST DATA: %s" % run_args
log.info(msg_user_data)
client = boto3.client('ec2', "us-west-2")
reservation = client.request_spot_instances(**run_args)
return reservation
if __name__ == '__main__':
cli()
Demo
- Spot Launch Demo and Walkthrough on Pricing
- Spot Instances EMR
- Spot Instances AWS Batch