diff options
| author | T-Aji <tolujbd2@gmail.com> | 2024-08-23 09:33:32 +0100 |
|---|---|---|
| committer | T-Aji <tolujbd2@gmail.com> | 2024-08-23 09:33:32 +0100 |
| commit | f7dedf78465d27abec5f467d377ec67741b44fb3 (patch) | |
| tree | c4b3c8983f9c236b2505485ede3bc9154c22e91d /src/transform_lambda.py | |
| parent | a8cadadfe2b96c84a29a252110822ec535a0da7e (diff) | |
| parent | f4bd9e3c85341c0805821728d42d74c19cb16bde (diff) | |
| download | de-project-bentley-f7dedf78465d27abec5f467d377ec67741b44fb3.tar.gz de-project-bentley-f7dedf78465d27abec5f467d377ec67741b44fb3.zip | |
Merge branch 'feature/transform-fact-sales-order' of https://github.com/ajschofield/de-project-bentley into feature/transform-fact-sales-order
Diffstat (limited to 'src/transform_lambda.py')
| -rw-r--r-- | src/transform_lambda.py | 56 |
1 files changed, 52 insertions, 4 deletions
diff --git a/src/transform_lambda.py b/src/transform_lambda.py index 920a24f..6024a24 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -2,10 +2,11 @@ import json import boto3 import re import pandas as pd - - -def lambda_handler(event, context): - pass +import pyarrow as pa +import pyarrow.parquet as pq +from src.extract_lambda import extract_bucket +from src.fact_purchase_table import * +from src.fact_sales_order import create_dim_staff, create_dim_design, create_fact_sales_order tables = [ @@ -22,6 +23,47 @@ tables = [ "payment_type", ] +def lambda_handler(event, context): + dict_of_df = read_from_s3_subfolder_to_df(tables, extract_bucket(), client=boto3.client("s3")) + common_df_list = [create_dim_counterparty(dict_of_df), + create_dim_date(dict_of_df), + create_dim_location(dict_of_df), + create_dim_currency(dict_of_df), + create_dim_staff(dict_of_df)] + + create_fact_purchase_order() + + f_sales_list = [create_fact_sales_order(), + create_dim_design()] + + + ''' + #dict{ + sales_schema: { + Table_name: df_value, + ...} + payment_schema: + Table_name: df_value, + ...} + purchase_schema: + Table_name: df_value, + ...} + } + + for schema in dict: + for table_name, df_value in schema.items(): + parquet_file = df_value.to_parquet(f'{table_name}.parquet', engine='pyarrow'/'fastparquet'(?)) #we don't know the engine + + s3_key = datetime.strftime( + datetime.today(), f"{schema}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet" + ) + + client.upload_file( + parquet_file, transform_bucket(), s3_key) + ##might need seperate function for easier testing## + ''' + + def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): table_dfs = {} @@ -34,4 +76,10 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): table_dfs[table] = pd.concat(list_of_df) return table_dfs +def transform_bucket(client=boto3.client("s3")): + response = client.list_buckets() + bucket_filter = [ + bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"] + ] + return bucket_filter[0] |
