1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
#from src.extract_lambda import extract_bucket
import json
import boto3
import re
import io
from io import StringIO
import pandas as pd
##add trigger window from extract bucket (on console?)
##suffix: must .csv --> reads only this file type that is uploaded to extract
##In-order to use PANDAS module in lambda function, a Lambda Layer needs to be attached to the AWS Lambda Function.
##need a function that normalises the data
#s3_resource = boto3.resource('s3') ##need this for a way of reuploading data after transformation
def lambda_handler(event, context):
s3_client = boto3.client('s3')
try:
s3_bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
s3_file_name = event["Records"][0]["s3"]["object"]["key"]
## concatanating the file per table - most recent
## iterate through the subfolders
## table name prefix to iterate through the files written to that table
object = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_file_name)
body = object['Body']
csv_string = body.read().decode('utf-8')
dataframe = pd.read_csv(StringIO(csv_string)) ##this is the streaming body
print(dataframe.head(3))
except Exception as err:
print(err)
# TODO implement
return {
'statusCode': 200,
'body': json.dumps('')
}
## Started from fresh on Wed 21st Aug:
tables = ['sales_order',
'transaction',
'payment',
'counterparty',
'address',
'staff',
'purchase_order',
'department',
'currency',
'design',
'payment_type']
def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client('s3')):
table_dfs = {}
for table in tables:
response = client.list_objects_v2(Bucket=bucket, Prefix=table)
list_of_keys = ['s3://'+bucket+'/'+object['Key'] for object in response['Contents']]
list_of_df = [pd.read_csv(key) for key in list_of_keys]
table_dfs[table] = pd.concat(list_of_df)
return table_dfs
|