Lesson2: Collection
Pragmatic AI Labs
This notebook was produced by Pragmatic AI Labs. You can continue learning about these topics by:
- Buying a copy of Pragmatic AI: An Introduction to Cloud-Based Machine Learning from Informit.
- Buying a copy of Pragmatic AI: An Introduction to Cloud-Based Machine Learning from Amazon
- Reading an online copy of Pragmatic AI:Pragmatic AI: An Introduction to Cloud-Based Machine Learning
- Watching video Essential Machine Learning and AI with Python and Jupyter Notebook-Video-SafariOnline on Safari Books Online.
- Watching video AWS Certified Machine Learning-Speciality
- Purchasing video Essential Machine Learning and AI with Python and Jupyter Notebook- Purchase Video
- Viewing more content at noahgift.com
Lesson 2.1 Determine the operational characteristics of the collection system
Data Ingestion Concepts
Data Lakes
Central Repository for all data at any scale
AWS Lake Formation
- New Service Announced at Reinvent 2018
- Build a secure lake in days…not months
- Enforce security policies
- Gain and manage insights
AWS Batch (BATCH)
Example could be Financial Service Trade Analysis
Using AWS Batch for ML Jobs
https://aws.amazon.com/batch/
Example submissions tool
@cli.group()
def run():
"""Run AWS Batch"""
@run.command("submit")
@click.option("--queue", default="first-run-job-queue", help="Batch Queue")
@click.option("--jobname", default="1", help="Name of Job")
@click.option("--jobdef", default="test", help="Job Definition")
@click.option("--cmd", default=["uname"], help="Container Override Commands")
def submit(queue, jobname, jobdef, cmd):
"""Submit a job"""
result = submit_job(
job_name=jobname,
job_queue=queue,
job_definition=jobdef,
command=cmd
)
click.echo("CLI: Run Job Called")
return result
Lambda (EVENTS)
- Serverless
- Used in most if not all ML Platforms
- DeepLense
- Sagemaker
- S3 Events
Starting development with AWS Python Lambda development with Chalice
Demo on Sagemaker Terminal
https://github.com/aws/chalice
Hello World Example:
$ pip install chalice
$ chalice new-project helloworld && cd helloworld
$ cat app.py
from chalice import Chalice
app = Chalice(app_name="helloworld")
@app.route("/")
def index():
return {"hello": "world"}
$ chalice deploy
...
https://endpoint/dev
$ curl https://endpoint/api
{"hello": "world"}
References:
Serverless Web Scraping Project
[Demo] Deploying Hello World Lambda Function
Using Step functions with AWS
https://aws.amazon.com/step-functions/
Example Project:
https://github.com/noahgift/web_scraping_python
[Demo] Step Function
Lesson 2.2 Select a collection system that handles the frequency of data change and type of data being ingested
Lesson 2.3 Identify the properties that need to be enforced by the collection system: order, data structure, metadata, etc.
Lesson 2.4 Explain the durability and availability characteristics for the collection approach
Lesson 2.5 Learn AWS Kinesis Streams
Kinesis Ad Tech Pipeline
Kinesis IoT
Kinesis (STREAMING)
Solves Three Key Problems
- Time-series Analytics
- Real-time Dashboards
- Real-time Metrics
Kinesis Analytics Workflow
Kinesis Real-Time Log Analytics Example
[Demo] Kinesis
Lesson 2.6 Learn AWS Kinesis Firehose
Kinesis Features
- Processes Data in Real-Time
- Can process hundreds of TBs an hour
- Example inputs are:
- logs
- financial transactions
- Streaming Data
!pip install -q sensible
import boto3
import asyncio
import time
import datetime
import uuid
import boto3
import json
from sensible.loginit import logger
LOG = logger(__name__)
def firehose_client(region_name="us-east-1"):
"""Kinesis Firehose client"""
firehose_conn = boto3.client("firehose", region_name=region_name)
extra_msg = {"region_name": region_name, "aws_service": "firehose"}
LOG.info("firehose connection initiated", extra=extra_msg)
return firehose_conn
async def put_record(data,
client,
delivery_stream_name="aws-ml-cert"):
"""
See this:
http://boto3.readthedocs.io/en/latest/reference/services/
firehose.html#Firehose.Client.put_record
"""
extra_msg = {"aws_service": "firehose"}
LOG.info(f"Pushing record to firehose: {data}", extra=extra_msg)
response = client.put_record(
DeliveryStreamName=delivery_stream_name,
Record={
'Data': data
}
)
return response
def gen_uuid_events():
"""Creates a time stamped UUID based event"""
current_time = 'test-{date:%Y-%m-%d %H:%M:%S}'.format(date=datetime.datetime.now())
event_id = str(uuid.uuid4())
event = {event_id:current_time}
return json.dumps(event)
def send_async_firehose_events(count=100):
"""Async sends events to firehose"""
start = time.time()
client = firehose_client()
extra_msg = {"aws_service": "firehose"}
loop = asyncio.get_event_loop()
tasks = []
LOG.info(f"sending aysnc events TOTAL {count}",extra=extra_msg)
num = 0
for _ in range(count):
tasks.append(asyncio.ensure_future(put_record(gen_uuid_events(), client)))
LOG.info(f"sending aysnc events: COUNT {num}/{count}")
num +=1
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
LOG.info("Total time: {}".format(end - start))
send_async_firehose_events(10)