aboutsummaryrefslogtreecommitdiffstats
path: root/src/transform_lambda.py
diff options
context:
space:
mode:
authorT-Aji <tolujbd2@gmail.com>2024-08-23 09:33:32 +0100
committerT-Aji <tolujbd2@gmail.com>2024-08-23 09:33:32 +0100
commitf7dedf78465d27abec5f467d377ec67741b44fb3 (patch)
treec4b3c8983f9c236b2505485ede3bc9154c22e91d /src/transform_lambda.py
parenta8cadadfe2b96c84a29a252110822ec535a0da7e (diff)
parentf4bd9e3c85341c0805821728d42d74c19cb16bde (diff)
downloadde-project-bentley-f7dedf78465d27abec5f467d377ec67741b44fb3.tar.gz
de-project-bentley-f7dedf78465d27abec5f467d377ec67741b44fb3.zip
Merge branch 'feature/transform-fact-sales-order' of https://github.com/ajschofield/de-project-bentley into feature/transform-fact-sales-order
Diffstat (limited to 'src/transform_lambda.py')
-rw-r--r--src/transform_lambda.py56
1 files changed, 52 insertions, 4 deletions
diff --git a/src/transform_lambda.py b/src/transform_lambda.py
index 920a24f..6024a24 100644
--- a/src/transform_lambda.py
+++ b/src/transform_lambda.py
@@ -2,10 +2,11 @@ import json
import boto3
import re
import pandas as pd
-
-
-def lambda_handler(event, context):
- pass
+import pyarrow as pa
+import pyarrow.parquet as pq
+from src.extract_lambda import extract_bucket
+from src.fact_purchase_table import *
+from src.fact_sales_order import create_dim_staff, create_dim_design, create_fact_sales_order
tables = [
@@ -22,6 +23,47 @@ tables = [
"payment_type",
]
+def lambda_handler(event, context):
+ dict_of_df = read_from_s3_subfolder_to_df(tables, extract_bucket(), client=boto3.client("s3"))
+ common_df_list = [create_dim_counterparty(dict_of_df),
+ create_dim_date(dict_of_df),
+ create_dim_location(dict_of_df),
+ create_dim_currency(dict_of_df),
+ create_dim_staff(dict_of_df)]
+
+ create_fact_purchase_order()
+
+ f_sales_list = [create_fact_sales_order(),
+ create_dim_design()]
+
+
+ '''
+ #dict{
+ sales_schema: {
+ Table_name: df_value,
+ ...}
+ payment_schema:
+ Table_name: df_value,
+ ...}
+ purchase_schema:
+ Table_name: df_value,
+ ...}
+ }
+
+ for schema in dict:
+ for table_name, df_value in schema.items():
+ parquet_file = df_value.to_parquet(f'{table_name}.parquet', engine='pyarrow'/'fastparquet'(?)) #we don't know the engine
+
+ s3_key = datetime.strftime(
+ datetime.today(), f"{schema}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
+ )
+
+ client.upload_file(
+ parquet_file, transform_bucket(), s3_key)
+ ##might need seperate function for easier testing##
+ '''
+
+
def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")):
table_dfs = {}
@@ -34,4 +76,10 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")):
table_dfs[table] = pd.concat(list_of_df)
return table_dfs
+def transform_bucket(client=boto3.client("s3")):
+ response = client.list_buckets()
+ bucket_filter = [
+ bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"]
+ ]
+ return bucket_filter[0]
git.ajschof.me — hosted by ajschofield — powered by cgit