aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/fact_purchase_table.py (renamed from src/fact-purchase-table.py)2
-rw-r--r--src/transform_lambda.py56
2 files changed, 53 insertions, 5 deletions
diff --git a/src/fact-purchase-table.py b/src/fact_purchase_table.py
index 597f104..f1d8fe1 100644
--- a/src/fact-purchase-table.py
+++ b/src/fact_purchase_table.py
@@ -1,3 +1,4 @@
+from bs4 import BeautifulSoup
from src.transform_lambda import read_from_s3_subfolder_to_df, tables
from src.extract_lambda import extract_bucket
import json
@@ -6,7 +7,6 @@ import re
import pandas as pd
from datetime import datetime as dt
import requests
-from bs4 import BeautifulSoup
## dim_staff table is the same across the schemas (no change)
diff --git a/src/transform_lambda.py b/src/transform_lambda.py
index 920a24f..6024a24 100644
--- a/src/transform_lambda.py
+++ b/src/transform_lambda.py
@@ -2,10 +2,11 @@ import json
import boto3
import re
import pandas as pd
-
-
-def lambda_handler(event, context):
- pass
+import pyarrow as pa
+import pyarrow.parquet as pq
+from src.extract_lambda import extract_bucket
+from src.fact_purchase_table import *
+from src.fact_sales_order import create_dim_staff, create_dim_design, create_fact_sales_order
tables = [
@@ -22,6 +23,47 @@ tables = [
"payment_type",
]
+def lambda_handler(event, context):
+ dict_of_df = read_from_s3_subfolder_to_df(tables, extract_bucket(), client=boto3.client("s3"))
+ common_df_list = [create_dim_counterparty(dict_of_df),
+ create_dim_date(dict_of_df),
+ create_dim_location(dict_of_df),
+ create_dim_currency(dict_of_df),
+ create_dim_staff(dict_of_df)]
+
+ create_fact_purchase_order()
+
+ f_sales_list = [create_fact_sales_order(),
+ create_dim_design()]
+
+
+ '''
+ #dict{
+ sales_schema: {
+ Table_name: df_value,
+ ...}
+ payment_schema:
+ Table_name: df_value,
+ ...}
+ purchase_schema:
+ Table_name: df_value,
+ ...}
+ }
+
+ for schema in dict:
+ for table_name, df_value in schema.items():
+ parquet_file = df_value.to_parquet(f'{table_name}.parquet', engine='pyarrow'/'fastparquet'(?)) #we don't know the engine
+
+ s3_key = datetime.strftime(
+ datetime.today(), f"{schema}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
+ )
+
+ client.upload_file(
+ parquet_file, transform_bucket(), s3_key)
+ ##might need seperate function for easier testing##
+ '''
+
+
def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")):
table_dfs = {}
@@ -34,4 +76,10 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")):
table_dfs[table] = pd.concat(list_of_df)
return table_dfs
+def transform_bucket(client=boto3.client("s3")):
+ response = client.list_buckets()
+ bucket_filter = [
+ bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"]
+ ]
+ return bucket_filter[0]
git.ajschof.me — hosted by ajschofield — powered by cgit