Open In Colab

Lesson2: Collection

Pragmatic AI Labs

alt text

This notebook was produced by Pragmatic AI Labs. You can continue learning about these topics by:

Lesson 2.1 Determine the operational characteristics of the collection system

Data Ingestion Concepts

Data Lakes

Central Repository for all data at any scale

data_lake

AWS Lake Formation

  • New Service Announced at Reinvent 2018
  • Build a secure lake in days…not months
  • Enforce security policies
  • Gain and manage insights

aws_lake

AWS Batch (BATCH)

Example could be Financial Service Trade Analysis

financial_services_trade

Using AWS Batch for ML Jobs

https://aws.amazon.com/batch/

alt text

Example submissions tool

@cli.group()
def run():
    """Run AWS Batch"""

@run.command("submit")
@click.option("--queue", default="first-run-job-queue", help="Batch Queue")
@click.option("--jobname", default="1", help="Name of Job")
@click.option("--jobdef", default="test", help="Job Definition")
@click.option("--cmd", default=["uname"], help="Container Override Commands")
def submit(queue, jobname, jobdef, cmd):
    """Submit a job"""

    result = submit_job(
        job_name=jobname,
        job_queue=queue,
        job_definition=jobdef,
        command=cmd
    )
    click.echo("CLI:  Run Job Called")
    return result

Lambda (EVENTS)

  • Serverless
  • Used in most if not all ML Platforms
  • DeepLense
  • Sagemaker
  • S3 Events

Starting development with AWS Python Lambda development with Chalice

Demo on Sagemaker Terminal

https://github.com/aws/chalice

Hello World Example:

$ pip install chalice
$ chalice new-project helloworld && cd helloworld
$ cat app.py

from chalice import Chalice

app = Chalice(app_name="helloworld")

@app.route("/")
def index():
    return {"hello": "world"}

$ chalice deploy
...
https://endpoint/dev

$ curl https://endpoint/api
{"hello": "world"}

References:

Serverless Web Scraping Project

[Demo] Deploying Hello World Lambda Function

Using Step functions with AWS

https://aws.amazon.com/step-functions/

Step Functions

Example Project:

https://github.com/noahgift/web_scraping_python

[Demo] Step Function

Lesson 2.2 Select a collection system that handles the frequency of data change and type of data being ingested

Lesson 2.3 Identify the properties that need to be enforced by the collection system: order, data structure, metadata, etc.

Lesson 2.4 Explain the durability and availability characteristics for the collection approach

Lesson 2.5 Learn AWS Kinesis Streams

Kinesis Ad Tech Pipeline

Ad Tech Pipeline

Kinesis IoT

Kinesis IoT

Kinesis (STREAMING)

Solves Three Key Problems

  • Time-series Analytics
  • Real-time Dashboards
  • Real-time Metrics
Kinesis Analytics Workflow

Kinesis Analytics

Kinesis Real-Time Log Analytics Example

Real-Time Log Analytics

[Demo] Kinesis

Lesson 2.6 Learn AWS Kinesis Firehose

Kinesis Features

Kinesis FAQ

  • Processes Data in Real-Time
  • Can process hundreds of TBs an hour
  • Example inputs are:
  • logs
  • financial transactions
  • Streaming Data
!pip install -q sensible
import boto3

import asyncio
import time
import datetime
import uuid
import boto3
import json
from sensible.loginit import logger

LOG = logger(__name__)

def firehose_client(region_name="us-east-1"):
    """Kinesis Firehose client"""

    firehose_conn = boto3.client("firehose", region_name=region_name)
    extra_msg = {"region_name": region_name, "aws_service": "firehose"}
    LOG.info("firehose connection initiated", extra=extra_msg)
    return firehose_conn

async def put_record(data,
            client,
            delivery_stream_name="aws-ml-cert"):
    """
    See this:
        http://boto3.readthedocs.io/en/latest/reference/services/
        firehose.html#Firehose.Client.put_record
    """
    extra_msg = {"aws_service": "firehose"}
    LOG.info(f"Pushing record to firehose: {data}", extra=extra_msg)
    response = client.put_record(
        DeliveryStreamName=delivery_stream_name,
        Record={
            'Data': data
        }
    )
    return response


def gen_uuid_events():
    """Creates a time stamped UUID based event"""

    current_time = 'test-{date:%Y-%m-%d %H:%M:%S}'.format(date=datetime.datetime.now())
    event_id = str(uuid.uuid4())
    event = {event_id:current_time}
    return json.dumps(event)

def send_async_firehose_events(count=100):
    """Async sends events to firehose"""

    start = time.time() 
    client = firehose_client()
    extra_msg = {"aws_service": "firehose"}
    loop = asyncio.get_event_loop()
    tasks = []
    LOG.info(f"sending aysnc events TOTAL {count}",extra=extra_msg)
    num = 0
    for _ in range(count):
        tasks.append(asyncio.ensure_future(put_record(gen_uuid_events(), client)))
        LOG.info(f"sending aysnc events: COUNT {num}/{count}")
        num +=1
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    end = time.time()  
    LOG.info("Total time: {}".format(end - start))


send_async_firehose_events(10)

Lesson 2.7 Use SQS

Lesson 2.8 Create Data Pipelines