<a href="https://colab.research.google.com/github/paiml/python_for_datascience/blob/master/Lesson14_Python_For_Data_Science_I_O.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Lesson 14: I/O

## Pragmatic AI Labs



![alt text](https://paiml.com/images/logo_with_slogan_white_background.png)

This notebook was produced by [Pragmatic AI Labs](https://paiml.com/).  You can continue learning about these topics by:

*   Buying a copy of [Pragmatic AI: An Introduction to Cloud-Based Machine Learning](http://www.informit.com/store/pragmatic-ai-an-introduction-to-cloud-based-machine-9780134863917)
*   Reading an online copy of [Pragmatic AI:Pragmatic AI: An Introduction to Cloud-Based Machine Learning](https://www.safaribooksonline.com/library/view/pragmatic-ai-an/9780134863924/)
*  Watching video [Essential Machine Learning and AI with Python and Jupyter Notebook-Video-SafariOnline](https://www.safaribooksonline.com/videos/essential-machine-learning/9780135261118) on Safari Books Online.
* Watching video [AWS Certified Machine Learning-Speciality](https://learning.oreilly.com/videos/aws-certified-machine/9780135556597)
* Purchasing video [Essential Machine Learning and AI with Python and Jupyter Notebook- Purchase Video](http://www.informit.com/store/essential-machine-learning-and-ai-with-python-and-jupyter-9780135261095)
*   Viewing more content at [noahgift.com](https://noahgift.com/)


## 14.1 Reading and Writing Files and Serializing Data in Python


### Working with Files

#### Writing to a file with 'context'

In [0]:
with open("food.txt", "w") as workfile:
    workfile.write("whey protein\n")
    workfile.write("cliff bar")
!cat food.txt

whey protein
cliff bar

#### Reading a file with 'context'

In [0]:
with open("food.txt", "r") as workfile:
    #print(workfile.readlines())
    print(workfile.read())


whey protein
cliff bar


### Serialization Techniques


##### Ingest

In [0]:
import pandas as pd

In [0]:
df = pd.read_csv(
    "https://raw.githubusercontent.com/noahgift/food/master/data/features.en.openfoodfacts.org.products.csv")
df.drop(["Unnamed: 0", "exceeded", "g_sum", "energy_100g"], axis=1, inplace=True) #drop two rows we don't need
df = df.drop(df.index[[1,11877]]) #drop outlier
df.rename(index=str, columns={"reconstructed_energy": "energy_100g"}, inplace=True)
df.head()

Unnamed: 0,fat_100g,carbohydrates_100g,sugars_100g,proteins_100g,salt_100g,energy_100g,product
0,28.57,64.29,14.29,3.57,0.0,2267.85,Banana Chips Sweetened (Whole)
2,57.14,17.86,3.57,17.86,1.22428,2835.7,Organic Salted Nut Mix
3,18.75,57.81,15.62,14.06,0.1397,1953.04,Organic Muesli
4,36.67,36.67,3.33,16.67,1.60782,2336.91,Zen Party Mix
5,18.18,60.0,21.82,14.55,0.02286,1976.37,Cinnamon Nut Granola


#### Serialize a Python Dictionary to Pickle

In [0]:
temp = df.head(1).to_dict('index')
mydict = list(temp.values())[0]
mydict

{'carbohydrates_100g': 64.29,
 'energy_100g': 2267.85,
 'fat_100g': 28.57,
 'product': 'Banana Chips Sweetened (Whole)',
 'proteins_100g': 3.57,
 'salt_100g': 0.0,
 'sugars_100g': 14.29}

In [0]:
import pickle

In [0]:
pickle.dump(mydict, open('mydictionary.pickle', 'wb'))

In [0]:
!ls -l mydictionary.pickle

-rw-r--r-- 1 root root 225 Feb 14 19:30 mydictionary.pickle


In [0]:
res = pickle.load(open('mydictionary.pickle', "rb"))

In [0]:
res

{'carbohydrates_100g': 64.29,
 'energy_100g': 2267.85,
 'fat_100g': 28.57,
 'product': 'Banana Chips Sweetened (Whole)',
 'proteins_100g': 3.57,
 'salt_100g': 0.0,
 'sugars_100g': 14.29}

#### Serialize a Python Dictionary to JSON


In [0]:
import json
with open('data.json', 'w') as outfile:
    json.dump(res, outfile)

In [0]:
!cat data.json

{"fat_100g": 28.57, "carbohydrates_100g": 64.29, "sugars_100g": 14.29, "proteins_100g": 3.57, "salt_100g": 0.0, "energy_100g": 2267.85, "product": "Banana Chips Sweetened (Whole)"}

In [0]:
with open('data.json', 'rb') as outfile:
    res2 = json.load(outfile)

In [0]:
res2

{'carbohydrates_100g': 64.29,
 'energy_100g': 2267.85,
 'fat_100g': 28.57,
 'product': 'Banana Chips Sweetened (Whole)',
 'proteins_100g': 3.57,
 'salt_100g': 0.0,
 'sugars_100g': 14.29}

#### Save to Yaml

In [0]:
import yaml

In [0]:
with open("data.yaml", "w") as yamlfile:                                               
    yaml.safe_dump(res2, yamlfile, default_flow_style=False)

In [0]:
!cat data.yaml

carbohydrates_100g: 64.29
energy_100g: 2267.85
fat_100g: 28.57
product: Banana Chips Sweetened (Whole)
proteins_100g: 3.57
salt_100g: 0.0
sugars_100g: 14.29


#### Load Yaml

In [0]:
with open("data.yaml", "rb") as yamlfile:                                               
    res3 = yaml.safe_load(yamlfile) 

In [0]:
type(res3)

dict

In [0]:
res3

{'carbohydrates_100g': 64.29,
 'energy_100g': 2267.85,
 'fat_100g': 28.57,
 'product': 'Banana Chips Sweetened (Whole)',
 'proteins_100g': 3.57,
 'salt_100g': 0.0,
 'sugars_100g': 14.29}

## 14.2 Reading and Writing Files and Serializing Data with Pandas

### Use Pandas DataFrames


#### Creating Pandas DataFrames

##### Creating DataFrames CSV file



*   Can be local
*   Can be hosted on a website



In [0]:
df = pd.read_csv(
    "https://raw.githubusercontent.com/noahgift/food/master/data/features.en.openfoodfacts.org.products.csv")
df.drop(["Unnamed: 0", "exceeded", "g_sum", "energy_100g"], axis=1, inplace=True) #drop two rows we don't need
df = df.drop(df.index[[1,11877]]) #drop outlier
df.rename(index=str, columns={"reconstructed_energy": "energy_100g"}, inplace=True)
df.head()

Unnamed: 0,fat_100g,carbohydrates_100g,sugars_100g,proteins_100g,salt_100g,energy_100g,product
0,28.57,64.29,14.29,3.57,0.0,2267.85,Banana Chips Sweetened (Whole)
2,57.14,17.86,3.57,17.86,1.22428,2835.7,Organic Salted Nut Mix
3,18.75,57.81,15.62,14.06,0.1397,1953.04,Organic Muesli
4,36.67,36.67,3.33,16.67,1.60782,2336.91,Zen Party Mix
5,18.18,60.0,21.82,14.55,0.02286,1976.37,Cinnamon Nut Granola


In [0]:
df.median()

fat_100g                 3.170
carbohydrates_100g      22.390
sugars_100g              5.880
proteins_100g            4.000
salt_100g                0.635
energy_100g           1121.540
dtype: float64

##### List to Pandas DataFrame 

Convert a column to a list to a Pandas DataFrame

In [0]:
products = df['product'].tolist()
products_df = pd.DataFrame(products)
products_df.columns = ["product"]
products_df.head(3)



Unnamed: 0,product
0,Banana Chips Sweetened (Whole)
1,Organic Salted Nut Mix
2,Organic Muesli


In [0]:
products_df.describe()

Unnamed: 0,product
count,44976
unique,30720
top,Ice Cream
freq,122


#### Exporting Pandas DataFrames

##### Pandas DataFrame to CSV

Write out DataFrame using to_csv


In [0]:
df.head().to_csv("small_food_records.csv")
!cat small_food_records.csv

,fat_100g,carbohydrates_100g,sugars_100g,proteins_100g,salt_100g,energy_100g,product
0,28.57,64.29,14.29,3.57,0.0,2267.85,Banana Chips Sweetened (Whole)
2,57.14,17.86,3.57,17.86,1.22428,2835.7,Organic Salted Nut Mix
3,18.75,57.81,15.62,14.06,0.1397,1953.04,Organic Muesli
4,36.67,36.67,3.33,16.67,1.60782,2336.91,Zen Party Mix
5,18.18,60.0,21.82,14.55,0.02286,1976.37,Cinnamon Nut Granola


#### Using Pandas on Ray

More info on Pandas:  https://rise.cs.berkeley.edu/blog/pandas-on-ray/

*Note:  Pandas is small data...data science.  You may need to use Spark on Pandas on Ray


#### Using Pandas on Dask

Dask natively scales Python

https://dask.org/

#### Using Google Sheets with Pandas DataFrames

Reference:  [Official Google Colab Documentation on IO](https://colab.research.google.com/notebooks/io.ipynb)

**Install Google Spreadsheet Library**

In [0]:
!pip install --upgrade -q gspread

**Authenticate to API**

In [0]:
from google.colab import auth
auth.authenticate_user()

import gspread
from oauth2client.client import GoogleCredentials

gc = gspread.authorize(GoogleCredentials.get_application_default())

**Create a Spreadsheet and Put Items in It**

Note, could use existing spreadsheet

In [0]:
sh = gc.create('pragmaticai-test')
worksheet = gc.open('pragmaticai-test').sheet1
cell_list = worksheet.range('A1:A10')

for cell in cell_list:
  cell.value = products.pop()
worksheet.update_cells(cell_list)

{'spreadsheetId': '1JUh0In3pmh6J7K5KQtGU1_6S9s3sB3cCa01meRIrYmU',
 'updatedCells': 10,
 'updatedColumns': 1,
 'updatedRange': 'Sheet1!A1:A10',
 'updatedRows': 10}

**Convert Spreadsheet Data to Pandas DataFrame**

In [0]:
worksheet = gc.open('pragmaticai-test').sheet1
rows = worksheet.get_all_values()
import pandas as pd
df = pd.DataFrame.from_records(rows)
print(df.median())
df

#df.head()

Series([], dtype: float64)


Unnamed: 0,0
0,100% Juice Reconstituted Lemon Juice With Adde...
1,"Cranberry Grape, 100% Juice Blend"
2,Cranberry Apple Flavored Juice Blended With On...
3,100% Juice Blend
4,"100% Juice, Prune Juice"
5,Prune Juice From Concentrate With Added Pulp
6,Prune Juice From Concentrate
7,100% Juice Grape Juice
8,White Grape Juice
9,100% Apple Juice From Concentrate


## 14.3 Reading and Writing using web resources

### Using Python Requests

In [0]:
import requests
url = """https://wikimedia.org/api/rest_v1/metrics/pageviews/per-article/en.wikipedia/all-access/user/LeBron_James/daily/2015070100/2017070500"""
result = requests.get(url)
result.json()["items"][0]


{'access': 'all-access',
 'agent': 'user',
 'article': 'LeBron_James',
 'granularity': 'daily',
 'project': 'en.wikipedia',
 'timestamp': '2015070100',
 'views': 18390}

### Using Boto

In [0]:
import boto3
resource = boto3.resource("s3")
resource.meta.client.download_file('testntest', 'nba_2017_endorsement_full_stats.csv',
'/tmp/nba_2017_endorsement_full_stats.csv')

### Using Github Files

### Using Kaggle Files

Use kaggle by mounting Google Drive with credentials

## 14.4 Using Function based concurrency

### Multiprocessing

#### Mapping processes to Functions

Processes are forked and run truly parallel (unlike threads)

In [0]:
from multiprocessing import Pool
import datetime
import time
import random

def fight_club(x):
  
    sleep_time = random.randrange(0,3)
    time.sleep(sleep_time)
    timestamp = datetime.datetime.now()
    print(f"Calculating punch with attack strength {x} to the {x} power: @timestamp {timestamp} with sleep {sleep_time}")
    return x**x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(fight_club, [1, 2, 3]))

Calculating punch with attack strength 1 to the 1 power: @timestamp 2019-02-14 19:50:11.489381 with sleep 1
Calculating punch with attack strength 3 to the 3 power: @timestamp 2019-02-14 19:50:11.489381 with sleep 1
Calculating punch with attack strength 2 to the 2 power: @timestamp 2019-02-14 19:50:12.490516 with sleep 2
[1, 4, 27]


#### Process Pool Joined on Queue (Threadlike behavior)

Mimicks Threading interface, but with actual multi-core functionality

In [0]:
from multiprocessing import Process, Queue

def f(q):
    q.put(["armbar", "kimura",  "Mata Leão"])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(f"Grabbing some attacks: {q.get()}")    
    p.join()

Grabbing some attacks: ['armbar', 'kimura', 'Mata Leão']


### Async IO

#### Async IO in Python Examples

More info here:  https://docs.python.org/3/library/asyncio.html

**Using Python3 Async**

```python
import asyncio

def send_async_firehose_events(count=100):
    """Async sends events to firehose"""

    start = time.time() 
    client = firehose_client()
    extra_msg = {"aws_service": "firehose"}
    loop = asyncio.get_event_loop()
    tasks = []
    LOG.info(f"sending aysnc events TOTAL {count}",extra=extra_msg)
    num = 0
    for _ in range(count):
        tasks.append(asyncio.ensure_future(put_record(gen_uuid_events(), client)))
        LOG.info(f"sending aysnc events: COUNT {num}/{count}")
        num +=1
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    end = time.time()  
    LOG.info("Total time: {}".format(end - start))
  ```

**Using trollius library with Python 2:  DEPRECATED**

```python
"""Generates an Async MetaData call.  Note, this isn't available in Boto3
In [56]: res = all_metadata_async()
In [57]: res
Out[57]: 
[('ami-manifest-path', <Response [200]>),
 ('instance-type', <Response [200]>),
 ('instance-id', <Response [200]>),
 ('iam', <Response [200]>),
 ('local-hostname', <Response [200]>),
 ('network', <Response [200]>),
 ('hostname', <Response [200]>),
 ('ami-id', <Response [200]>),
 ('instance-action', <Response [200]>),
 ('profile', <Response [200]>),
 ('reservation-id', <Response [200]>),
 ('security-groups', <Response [200]>),
 ('metrics', <Response [200]>),
 ('mac', <Response [200]>),
 ('public-ipv4', <Response [200]>),
 ('services', <Response [200]>),
 ('local-ipv4', <Response [200]>),
 ('placement', <Response [200]>),
 ('ami-launch-index', <Response [200]>),
 ('public-hostname', <Response [200]>),
 ('public-keys', <Response [200]>),
 ('block-device-mapping', <Response [200]>)]
"""

import requests
import trollius

def get_metadata_api_urls():
    """Retrieves the api endpoints for metadata"""

    full_urls = {}
    metadata_url = "http://169.254.169.254/latest/meta-data/"
    resp = requests.get(metadata_url)
    urls = resp.content.split()
    for url in urls:
        stripped_url = url.rstrip("/")
        full_urls[stripped_url]=(os.path.join(metadata_url, url))
    return full_urls

def _get(key_url):
    key,url = key_url
    return key, requests.get(url)

def _do_calls(urls):
    loop = trollius.get_event_loop()
    futures = []
    for url in urls:
        futures.append(loop.run_in_executor(None, _get, url))
    return futures

@trollius.coroutine
def call():
    results = []
    futures = _do_calls(get_metadata_api_urls().items())
    for future in futures:
        result = yield trollius.From(future)
        results.append(result)
    raise trollius.Return(results)

def all_metadata_async():
    """Retrieves all available metadata for an instance async"""

    loop = trollius.get_event_loop()
    res = loop.run_until_complete(call())
   ```


### Serverless or FaaS (Functions as a service)

#### AWS Lambda

#####  AWS Lambda and Chalice Example

Standalone Lambda with Chalice:  http://chalice.readthedocs.io/en/latest/

```python
@app.lambda_function()
def send_message(event, context):
    """Send a message to a channel"""

    slack_client = SlackClient(SLACK_TOKEN)
    res = slack_client.api_call(
      "chat.postMessage",
      channel="#general",
      text=event
    )
    return res
```


#### Fn Project

##### Fn Project

![Fn Project](https://camo.githubusercontent.com/aad13cfe0e267f38143fd8cc6816ab8adde37a56/687474703a2f2f666e70726f6a6563742e696f2f696d616765732f666e2d333030783132352e706e67)


*   [FN Project](https://fnproject.io/)
*   [FN Project Python Example](http://fnproject.io/tutorials/python/intro/)



```bash

fn init --runtime python --trigger http pythonfn

```



```python

import fdk
import json


def handler(ctx, data=None, loop=None):
    name = "World"
    if data and len(data) > 0:
        body = json.loads(data)
        name = body.get("name")
    return {"message": "Hello {0}".format(name)}



if __name__ == "__main__":
    fdk.handle(handler)
```







### Large Scale Concurrency Solutions

#### Larger Scale Concurrency



*   [AWS Step Functions with Lambda](https://aws.amazon.com/step-functions/)

![alt text](https://d1.awsstatic.com/product-marketing/Step%20Functions/OrderFullScreen.0e74c2f19d89a9325addb5bd746cd895b2e4c9c2.jpg)

*   [AWS Batch](https://aws.amazon.com/batch/)
![alt text](https://d1.awsstatic.com/Test%20Images/Kate%20Test%20Images/Dilithium_flowchart%20diagrams_v3_kw-02.322877d73eda8ed71a44db216a1d195550befac0.png)

*   [RabbitMQ Worker Farms-IBM Developerworks Article](https://www.ibm.com/developerworks/cloud/library/cl-optimizepythoncloud1/index.html)

![alt text](https://www.ibm.com/developerworks/cloud/library/cl-optimizepythoncloud2/figure1.gif)





### High Level Concurrency Overview for Machine Learning and HPC (High Performance Computing)


#### Diagram of Python Performance Problems


![63,000X Speedup for Matrix Multiply from Standard Python](https://user-images.githubusercontent.com/58792/45932870-37339000-bf38-11e8-8272-bf2addf56df1.png)

Source:  [Dave Patterson, UC Berkeley](https://www2.eecs.berkeley.edu/Faculty/Homepages/patterson.html)

#### Numba

[Numba](http://numba.pydata.org/)

*   open source JIT (Just in Time Compiler)
*   translates a subset of Python and Numpy code into fast machine code
*   Can approach speed of C
*   Can also parallize:  "true threads" and "GPU"





##### Install Numba

In [0]:
!pip3 install numba



##### Use Numba

In [0]:
from numba import (cuda, vectorize)
import numba
import pandas as pd
import numpy as np

In [0]:
def real_estate_df():
    """30 Years of Housing Prices"""

    df = pd.read_csv("https://raw.githubusercontent.com/noahgift/real_estate_ml/master/data/Zip_Zhvi_SingleFamilyResidence.csv")
    df.rename(columns={"RegionName":"ZipCode"}, inplace=True)
    df["ZipCode"]=df["ZipCode"].map(lambda x: "{:.0f}".format(x))
    df["RegionID"]=df["RegionID"].map(lambda x: "{:.0f}".format(x))
    return df

def numerical_real_estate_array(df):
    """Converts df to numpy numerical array"""

    columns_to_drop = ['RegionID', 'ZipCode', 'City', 'State', 'Metro', 'CountyName']
    df_numerical = df.dropna()
    df_numerical = df_numerical.drop(columns_to_drop, axis=1)
    return df_numerical.values

def real_estate_array():
    """Returns Real Estate Array"""

    df = real_estate_df()
    rea = numerical_real_estate_array(df)
    return np.float32(rea)
  
rea = real_estate_array()

##### Use Numba decorator

In [0]:
import numba

In [0]:
@numba.jit(nopython=True)
def expmean_jit(rea):
    """Perform multiple mean calculations"""

    val = rea.mean() ** 2
    return val
  
expmean_jit(rea)

44968886272.0

##### Multi-threaded numba

True multi-threaded code (Warning will use all cores on anymachine that runs it)

In [0]:
@numba.jit(parallel=True)
def add_sum_threaded(rea):
    """Use all the cores"""

    x,_ = rea.shape
    total = 0
    for _ in numba.prange(x):
        total += rea.sum()  
        print(total)
        
add_sum_threaded(rea)

550019399680.0550019399680.0

1100038799360.01100038799360.0

1650058199040.0
1650058199040.0
2200077598720.0
2200077598720.0
2750096998400.0
2750096998400.0
3300116398080.0
3300116398080.0
3850135797760.0
3850135797760.0
4400155197440.0
4950174597120.0
4400155197440.0
5500193996800.0
4950174597120.0
6050213396480.0
5500193996800.0
6600232796160.0
6050213396480.0
7150252195840.0
6600232796160.0
7700271595520.0
7150252195840.0
8250290995200.0
7700271595520.0
8800310394880.0
8250290995200.0
9350329794560.0
8800310394880.0
9900349194240.0
9350329794560.0
10450368593920.0
9900349194240.0
11000387993600.0
10450368593920.011550407393280.0

12100426792960.0
11000387993600.0
12650446192640.0
11550407393280.0
13200465592320.0
12100426792960.0
13750484992000.012650446192640.0

14300504391680.013200465592320.0

13750484992000.0
14850523791360.0
14300504391680.0
15400543191040.0
14850523791360.0
15950562590720.0
15400543191040.0
16500581990400.0
15950562590720.0
17050601390080.0
16500581990400.0
1

#### GPU 

Heavily used in Deep Learning

*   NVidia
  - Numba [CUDA GPU ](http://numba.pydata.org/numba-doc/latest/cuda/index.html)
*  AMD
  - Numba [AMD ROC GPU](http://numba.pydata.org/numba-doc/latest/roc/index.html)



##### Use GPU

In [0]:
@vectorize(['float32(float32, float32)'], target='cuda')
def add_ufunc(x, y):
    return x + y
  
def cuda_operation():
    """Performs Vectorized Operations on GPU"""

    x = real_estate_array()
    y = real_estate_array()

    print("Moving calculations to GPU memory")
    x_device = cuda.to_device(x)
    y_device = cuda.to_device(y)
    out_device = cuda.device_array(
        shape=(x_device.shape[0],x_device.shape[1]), dtype=np.float32)
    print(x_device)
    print(x_device.shape)
    print(x_device.dtype)

    print("Calculating on GPU")
    add_ufunc(x_device,y_device, out=out_device)

    out_host = out_device.copy_to_host()
    print(f"Calculations from GPU {out_host}")
    
cuda_operation()

#### TPU

[Tensor Processing Unit](https://cloud.google.com/tpu/docs/tpus)



*   "Google’s custom-developed application-specific integrated circuits (ASICs) used to accelerate machine learning workloads"
*   Available both in colab notebooks and on Google Cloud

