aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorlian-manonog <lian.manonog@gmail.com>2024-08-22 17:06:45 +0100
committerlian-manonog <lian.manonog@gmail.com>2024-08-22 17:06:45 +0100
commitf4bd9e3c85341c0805821728d42d74c19cb16bde (patch)
treec44939f50371e67d2f301632d4138e2e96b26f83
parentdaee22145e8ce27425dd8de941b5ab65e6a619ae (diff)
downloadde-project-bentley-f4bd9e3c85341c0805821728d42d74c19cb16bde.tar.gz
de-project-bentley-f4bd9e3c85341c0805821728d42d74c19cb16bde.zip
wip: wrote pseudocode for lambda handler in writing df to parquet file format and uploading the parquet files
-rw-r--r--requirements.txt4
-rw-r--r--src/fact_purchase_table.py (renamed from src/fact-purchase-table.py)2
-rw-r--r--src/transform_lambda.py56
3 files changed, 56 insertions, 6 deletions
diff --git a/requirements.txt b/requirements.txt
index 62ebbf4..0c81216 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -29,4 +29,6 @@ urllib3==2.2.2
Werkzeug==3.0.3
xmltodict==0.13.0
s3fs
-pandas \ No newline at end of file
+pandas
+bs4
+pyarrow \ No newline at end of file
diff --git a/src/fact-purchase-table.py b/src/fact_purchase_table.py
index 597f104..f1d8fe1 100644
--- a/src/fact-purchase-table.py
+++ b/src/fact_purchase_table.py
@@ -1,3 +1,4 @@
+from bs4 import BeautifulSoup
from src.transform_lambda import read_from_s3_subfolder_to_df, tables
from src.extract_lambda import extract_bucket
import json
@@ -6,7 +7,6 @@ import re
import pandas as pd
from datetime import datetime as dt
import requests
-from bs4 import BeautifulSoup
## dim_staff table is the same across the schemas (no change)
diff --git a/src/transform_lambda.py b/src/transform_lambda.py
index 920a24f..6024a24 100644
--- a/src/transform_lambda.py
+++ b/src/transform_lambda.py
@@ -2,10 +2,11 @@ import json
import boto3
import re
import pandas as pd
-
-
-def lambda_handler(event, context):
- pass
+import pyarrow as pa
+import pyarrow.parquet as pq
+from src.extract_lambda import extract_bucket
+from src.fact_purchase_table import *
+from src.fact_sales_order import create_dim_staff, create_dim_design, create_fact_sales_order
tables = [
@@ -22,6 +23,47 @@ tables = [
"payment_type",
]
+def lambda_handler(event, context):
+ dict_of_df = read_from_s3_subfolder_to_df(tables, extract_bucket(), client=boto3.client("s3"))
+ common_df_list = [create_dim_counterparty(dict_of_df),
+ create_dim_date(dict_of_df),
+ create_dim_location(dict_of_df),
+ create_dim_currency(dict_of_df),
+ create_dim_staff(dict_of_df)]
+
+ create_fact_purchase_order()
+
+ f_sales_list = [create_fact_sales_order(),
+ create_dim_design()]
+
+
+ '''
+ #dict{
+ sales_schema: {
+ Table_name: df_value,
+ ...}
+ payment_schema:
+ Table_name: df_value,
+ ...}
+ purchase_schema:
+ Table_name: df_value,
+ ...}
+ }
+
+ for schema in dict:
+ for table_name, df_value in schema.items():
+ parquet_file = df_value.to_parquet(f'{table_name}.parquet', engine='pyarrow'/'fastparquet'(?)) #we don't know the engine
+
+ s3_key = datetime.strftime(
+ datetime.today(), f"{schema}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
+ )
+
+ client.upload_file(
+ parquet_file, transform_bucket(), s3_key)
+ ##might need seperate function for easier testing##
+ '''
+
+
def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")):
table_dfs = {}
@@ -34,4 +76,10 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")):
table_dfs[table] = pd.concat(list_of_df)
return table_dfs
+def transform_bucket(client=boto3.client("s3")):
+ response = client.list_buckets()
+ bucket_filter = [
+ bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"]
+ ]
+ return bucket_filter[0]
git.ajschof.me — hosted by ajschofield — powered by cgit