aboutsummaryrefslogtreecommitdiffstats
path: root/src/transform_lambda.py
blob: 6024a24372f018f49890837f7fb499212163002e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import json
import boto3
import re
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from src.extract_lambda import extract_bucket
from src.fact_purchase_table import *
from src.fact_sales_order import create_dim_staff, create_dim_design, create_fact_sales_order


tables = [
    "sales_order",
    "transaction",
    "payment",
    "counterparty",
    "address",
    "staff",
    "purchase_order",
    "department",
    "currency",
    "design",
    "payment_type",
]

def lambda_handler(event, context):
    dict_of_df = read_from_s3_subfolder_to_df(tables, extract_bucket(), client=boto3.client("s3"))
    common_df_list = [create_dim_counterparty(dict_of_df), 
                      create_dim_date(dict_of_df), 
                      create_dim_location(dict_of_df), 
                      create_dim_currency(dict_of_df), 
                      create_dim_staff(dict_of_df)] 
    
    create_fact_purchase_order()

    f_sales_list = [create_fact_sales_order(),
                    create_dim_design()]
                    
    
    '''
    #dict{
        sales_schema: {
            Table_name: df_value, 
            ...}
        payment_schema: 
            Table_name: df_value, 
            ...}
        purchase_schema: 
            Table_name: df_value, 
            ...}
    }

    for schema in dict:
        for table_name, df_value in schema.items():
            parquet_file = df_value.to_parquet(f'{table_name}.parquet', engine='pyarrow'/'fastparquet'(?)) #we don't know the engine

            s3_key = datetime.strftime(
                        datetime.today(), f"{schema}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
                    )

            client.upload_file(
            parquet_file, transform_bucket(), s3_key)
            ##might need seperate function for easier testing##
    '''



def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")):
    table_dfs = {}
    for table in tables:
        response = client.list_objects_v2(Bucket=bucket, Prefix=table)
        list_of_keys = [
            "s3://" + bucket + "/" + object["Key"] for object in response["Contents"]
        ]
        list_of_df = [pd.read_csv(key) for key in list_of_keys]
        table_dfs[table] = pd.concat(list_of_df)
    return table_dfs

def transform_bucket(client=boto3.client("s3")):
    response = client.list_buckets()
    bucket_filter = [
        bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"]
    ]

    return bucket_filter[0]
git.ajschof.me — hosted by ajschofield — powered by cgit