From f4bd9e3c85341c0805821728d42d74c19cb16bde Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Thu, 22 Aug 2024 17:06:45 +0100 Subject: wip: wrote pseudocode for lambda handler in writing df to parquet file format and uploading the parquet files --- src/fact-purchase-table.py | 71 ---------------------------------------------- src/fact_purchase_table.py | 71 ++++++++++++++++++++++++++++++++++++++++++++++ src/transform_lambda.py | 56 +++++++++++++++++++++++++++++++++--- 3 files changed, 123 insertions(+), 75 deletions(-) delete mode 100644 src/fact-purchase-table.py create mode 100644 src/fact_purchase_table.py (limited to 'src') diff --git a/src/fact-purchase-table.py b/src/fact-purchase-table.py deleted file mode 100644 index 597f104..0000000 --- a/src/fact-purchase-table.py +++ /dev/null @@ -1,71 +0,0 @@ -from src.transform_lambda import read_from_s3_subfolder_to_df, tables -from src.extract_lambda import extract_bucket -import json -import boto3 -import re -import pandas as pd -from datetime import datetime as dt -import requests -from bs4 import BeautifulSoup - - -## dim_staff table is the same across the schemas (no change) - -## dim_location from address --> drops 2 columns -def create_dim_location(dict_of_df): - df_loc = dict_of_df['address'].drop(labels=['created_at', 'last_updated'], axis=1).rename(columns={'address_id': 'location_id'}).set_index('location_id') - return df_loc - -## dim_counterparty from address and counterparty -def create_dim_counterparty(dict_of_df): - df_prefixed_address = dict_of_df['address'].add_prefix('counterparty_legal_', axis=1) - df_cp = pd.merge(dict_of_df['counterparty'], - df_prefixed_address, - left_on="legal_address_id", - right_on="address_id", - how="outer").set_index('counterparty_id') - return df_cp - -## fact_purchase_order from purchase_order -def create_fact_purchase_order(dict_of_df): - df_po = dict_of_df['purchase_order'] - df_po.index.name = 'purchase_record_id' - df_po['created_date'] = df_po['created_at'].date() - df_po['created_time'] = df_po['created_at'].dt.time - df_po['last_updated_date'] = df_po['last_updated_at'].date() - df_po['last_updated_time'] = df_po['last_updated_at'].dt.time - df_po['agreed_delivery_date'] = pd.to_datetime(df_po['agreed_delivery_date'],format="%Y-%m-%d") - df_po['agreed_payment_date'] = pd.to_datetime(df_po['agreed_payment_date'],format="%Y-%m-%d") - df_po.drop(labels=['created_at','last_updated_at'],axis=1,inplace=True) - return df_po - -## dim_date from purchase_order -def create_dim_date(dict_of_df): - sr_date = pd.concat([df['created_date'],df['last_updated_date'],df['agreed_delivery_date'],df['agreed_payment_date']]).sort() - df_date = pd.DataFrame(sr_date,columns='date_id') - df_date['year'] = df_date['date_id'].dt.year - df_date['month'] = df_date['date_id'].dt.month - df_date['day'] = df_date['date_id'].dt.day - df_date['day_of_week'] = df_date['date_id'].dt.dayofweek - df_date['day_name'] = df_date['date_id'].dt.day_name - df_date['month_name'] = df_date['date_id'].dt.month_name - df_date['quarter'] = df_date['date_id'].dt.quarter - df_date.set_index('date_id') - -def scrape_currency_names(): - response = requests.get('https://www.xe.com/currency/').content - soup = BeautifulSoup(response,'html.parser') - currency = [item.text for item in soup.findAll('a', attrs={'class' : "sc-299dec64-6 fZPTSw"})] - sr = pd.Series(currency) - df_cur = sr.str.split(pat=" - ",expand=True).rename({0:'currency_code',1:'currency_name'},axis=1) - return df_cur - -def create_dim_currency(dict_of_df,names=scrape_currency_names()): - df_cur = dict_of_df['currency'].drop(labels=['created_at', 'last_updated'], axis=1) - dim_cur = pd.merge(df_cur,names,left_on='currency_code',right_on='currency_code',how='inner').set_index('currency_id') - return dim_cur - - - - - diff --git a/src/fact_purchase_table.py b/src/fact_purchase_table.py new file mode 100644 index 0000000..f1d8fe1 --- /dev/null +++ b/src/fact_purchase_table.py @@ -0,0 +1,71 @@ +from bs4 import BeautifulSoup +from src.transform_lambda import read_from_s3_subfolder_to_df, tables +from src.extract_lambda import extract_bucket +import json +import boto3 +import re +import pandas as pd +from datetime import datetime as dt +import requests + + +## dim_staff table is the same across the schemas (no change) + +## dim_location from address --> drops 2 columns +def create_dim_location(dict_of_df): + df_loc = dict_of_df['address'].drop(labels=['created_at', 'last_updated'], axis=1).rename(columns={'address_id': 'location_id'}).set_index('location_id') + return df_loc + +## dim_counterparty from address and counterparty +def create_dim_counterparty(dict_of_df): + df_prefixed_address = dict_of_df['address'].add_prefix('counterparty_legal_', axis=1) + df_cp = pd.merge(dict_of_df['counterparty'], + df_prefixed_address, + left_on="legal_address_id", + right_on="address_id", + how="outer").set_index('counterparty_id') + return df_cp + +## fact_purchase_order from purchase_order +def create_fact_purchase_order(dict_of_df): + df_po = dict_of_df['purchase_order'] + df_po.index.name = 'purchase_record_id' + df_po['created_date'] = df_po['created_at'].date() + df_po['created_time'] = df_po['created_at'].dt.time + df_po['last_updated_date'] = df_po['last_updated_at'].date() + df_po['last_updated_time'] = df_po['last_updated_at'].dt.time + df_po['agreed_delivery_date'] = pd.to_datetime(df_po['agreed_delivery_date'],format="%Y-%m-%d") + df_po['agreed_payment_date'] = pd.to_datetime(df_po['agreed_payment_date'],format="%Y-%m-%d") + df_po.drop(labels=['created_at','last_updated_at'],axis=1,inplace=True) + return df_po + +## dim_date from purchase_order +def create_dim_date(dict_of_df): + sr_date = pd.concat([df['created_date'],df['last_updated_date'],df['agreed_delivery_date'],df['agreed_payment_date']]).sort() + df_date = pd.DataFrame(sr_date,columns='date_id') + df_date['year'] = df_date['date_id'].dt.year + df_date['month'] = df_date['date_id'].dt.month + df_date['day'] = df_date['date_id'].dt.day + df_date['day_of_week'] = df_date['date_id'].dt.dayofweek + df_date['day_name'] = df_date['date_id'].dt.day_name + df_date['month_name'] = df_date['date_id'].dt.month_name + df_date['quarter'] = df_date['date_id'].dt.quarter + df_date.set_index('date_id') + +def scrape_currency_names(): + response = requests.get('https://www.xe.com/currency/').content + soup = BeautifulSoup(response,'html.parser') + currency = [item.text for item in soup.findAll('a', attrs={'class' : "sc-299dec64-6 fZPTSw"})] + sr = pd.Series(currency) + df_cur = sr.str.split(pat=" - ",expand=True).rename({0:'currency_code',1:'currency_name'},axis=1) + return df_cur + +def create_dim_currency(dict_of_df,names=scrape_currency_names()): + df_cur = dict_of_df['currency'].drop(labels=['created_at', 'last_updated'], axis=1) + dim_cur = pd.merge(df_cur,names,left_on='currency_code',right_on='currency_code',how='inner').set_index('currency_id') + return dim_cur + + + + + diff --git a/src/transform_lambda.py b/src/transform_lambda.py index 920a24f..6024a24 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -2,10 +2,11 @@ import json import boto3 import re import pandas as pd - - -def lambda_handler(event, context): - pass +import pyarrow as pa +import pyarrow.parquet as pq +from src.extract_lambda import extract_bucket +from src.fact_purchase_table import * +from src.fact_sales_order import create_dim_staff, create_dim_design, create_fact_sales_order tables = [ @@ -22,6 +23,47 @@ tables = [ "payment_type", ] +def lambda_handler(event, context): + dict_of_df = read_from_s3_subfolder_to_df(tables, extract_bucket(), client=boto3.client("s3")) + common_df_list = [create_dim_counterparty(dict_of_df), + create_dim_date(dict_of_df), + create_dim_location(dict_of_df), + create_dim_currency(dict_of_df), + create_dim_staff(dict_of_df)] + + create_fact_purchase_order() + + f_sales_list = [create_fact_sales_order(), + create_dim_design()] + + + ''' + #dict{ + sales_schema: { + Table_name: df_value, + ...} + payment_schema: + Table_name: df_value, + ...} + purchase_schema: + Table_name: df_value, + ...} + } + + for schema in dict: + for table_name, df_value in schema.items(): + parquet_file = df_value.to_parquet(f'{table_name}.parquet', engine='pyarrow'/'fastparquet'(?)) #we don't know the engine + + s3_key = datetime.strftime( + datetime.today(), f"{schema}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet" + ) + + client.upload_file( + parquet_file, transform_bucket(), s3_key) + ##might need seperate function for easier testing## + ''' + + def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): table_dfs = {} @@ -34,4 +76,10 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): table_dfs[table] = pd.concat(list_of_df) return table_dfs +def transform_bucket(client=boto3.client("s3")): + response = client.list_buckets() + bucket_filter = [ + bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"] + ] + return bucket_filter[0] -- cgit v1.2.3