From 8e20c5c0f43d0f0c4983c8895396de7f62b7c390 Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Fri, 23 Aug 2024 11:06:43 +0100 Subject: Deleted the fact_table schema py files Completed Lambda_handler for transform_lambda - and other helper functions. Testing is still to be done. Need to implement lambda layer to share helper functions across all lambdas --- src/fact_payment.py | 30 ------- src/fact_purchase_table.py | 71 ---------------- src/fact_sales_order.py | 91 --------------------- src/transform_lambda.py | 198 +++++++++++++++++++++++++++++++++++---------- 4 files changed, 157 insertions(+), 233 deletions(-) delete mode 100644 src/fact_payment.py delete mode 100644 src/fact_purchase_table.py delete mode 100644 src/fact_sales_order.py diff --git a/src/fact_payment.py b/src/fact_payment.py deleted file mode 100644 index 92de67c..0000000 --- a/src/fact_payment.py +++ /dev/null @@ -1,30 +0,0 @@ -import pandas as pd - -def create_dim_payment_type(dict_of_df): - df_payment_type = dict_of_df["payment_type"] - dim_payment_type = df_payment_type.loc[:, ["payment_type_id", "payment_type_name"]] - return dim_payment_type - -def create_fact_payment(dict_of_df): - df_payment = dict_of_df["payment"] - df_payment.index.name = "payment_record_id" - df_payment["created_date"] = pd.to_datetime(df_payment["created_at"]).dt.date - df_payment["created_time"] = pd.to_datetime(df_payment["created_at"]).dt.time - df_payment["last_updated_date"] = pd.to_datetime(df_payment["last_updated"]).dt.date - df_payment["last_updated_time"] = pd.to_datetime(df_payment["last_updated"]).dt.time - fact_payment = df_payment.loc[:,[ - "payment_record_id", - "payment_id", - "created_date", - "created_time", - "last_updated_date", - "last_updated_time", - "transaction_id", - "counterparty_id", - "payment_amount", - "currency_id", - "payment_type_id", - "paid", - "payment_date" - ]] - return fact_payment diff --git a/src/fact_purchase_table.py b/src/fact_purchase_table.py deleted file mode 100644 index f1d8fe1..0000000 --- a/src/fact_purchase_table.py +++ /dev/null @@ -1,71 +0,0 @@ -from bs4 import BeautifulSoup -from src.transform_lambda import read_from_s3_subfolder_to_df, tables -from src.extract_lambda import extract_bucket -import json -import boto3 -import re -import pandas as pd -from datetime import datetime as dt -import requests - - -## dim_staff table is the same across the schemas (no change) - -## dim_location from address --> drops 2 columns -def create_dim_location(dict_of_df): - df_loc = dict_of_df['address'].drop(labels=['created_at', 'last_updated'], axis=1).rename(columns={'address_id': 'location_id'}).set_index('location_id') - return df_loc - -## dim_counterparty from address and counterparty -def create_dim_counterparty(dict_of_df): - df_prefixed_address = dict_of_df['address'].add_prefix('counterparty_legal_', axis=1) - df_cp = pd.merge(dict_of_df['counterparty'], - df_prefixed_address, - left_on="legal_address_id", - right_on="address_id", - how="outer").set_index('counterparty_id') - return df_cp - -## fact_purchase_order from purchase_order -def create_fact_purchase_order(dict_of_df): - df_po = dict_of_df['purchase_order'] - df_po.index.name = 'purchase_record_id' - df_po['created_date'] = df_po['created_at'].date() - df_po['created_time'] = df_po['created_at'].dt.time - df_po['last_updated_date'] = df_po['last_updated_at'].date() - df_po['last_updated_time'] = df_po['last_updated_at'].dt.time - df_po['agreed_delivery_date'] = pd.to_datetime(df_po['agreed_delivery_date'],format="%Y-%m-%d") - df_po['agreed_payment_date'] = pd.to_datetime(df_po['agreed_payment_date'],format="%Y-%m-%d") - df_po.drop(labels=['created_at','last_updated_at'],axis=1,inplace=True) - return df_po - -## dim_date from purchase_order -def create_dim_date(dict_of_df): - sr_date = pd.concat([df['created_date'],df['last_updated_date'],df['agreed_delivery_date'],df['agreed_payment_date']]).sort() - df_date = pd.DataFrame(sr_date,columns='date_id') - df_date['year'] = df_date['date_id'].dt.year - df_date['month'] = df_date['date_id'].dt.month - df_date['day'] = df_date['date_id'].dt.day - df_date['day_of_week'] = df_date['date_id'].dt.dayofweek - df_date['day_name'] = df_date['date_id'].dt.day_name - df_date['month_name'] = df_date['date_id'].dt.month_name - df_date['quarter'] = df_date['date_id'].dt.quarter - df_date.set_index('date_id') - -def scrape_currency_names(): - response = requests.get('https://www.xe.com/currency/').content - soup = BeautifulSoup(response,'html.parser') - currency = [item.text for item in soup.findAll('a', attrs={'class' : "sc-299dec64-6 fZPTSw"})] - sr = pd.Series(currency) - df_cur = sr.str.split(pat=" - ",expand=True).rename({0:'currency_code',1:'currency_name'},axis=1) - return df_cur - -def create_dim_currency(dict_of_df,names=scrape_currency_names()): - df_cur = dict_of_df['currency'].drop(labels=['created_at', 'last_updated'], axis=1) - dim_cur = pd.merge(df_cur,names,left_on='currency_code',right_on='currency_code',how='inner').set_index('currency_id') - return dim_cur - - - - - diff --git a/src/fact_sales_order.py b/src/fact_sales_order.py deleted file mode 100644 index 425b144..0000000 --- a/src/fact_sales_order.py +++ /dev/null @@ -1,91 +0,0 @@ -import pandas as pd - - -def create_dim_design(dict_of_df): - df_design = dict_of_df["design"] - dim_design = df_design.loc[:, ["design_id", "design_name", "file_name", "file_location"]] - return dim_design - -def create_dim_staff(dict_of_df): - staff_department = pd.merge(dict_of_df["staff"], dict_of_df["department"], on='department_id', how="left") - dim_staff = staff_department.loc[:, ['staff_id', 'first_name', 'last_name', 'department_name', 'location', 'email_address']] - return dim_staff - -def create_dim_currency(dict_of_df): - df_currency = dict_of_df["currency"] - dim_currency = df_currency.loc[:, ["currency_id", "currency_code"]] - mappings = { - "GBP": "Pound", - "USD": "US Dollar", - "EUR": "Euro" - } - dim_currency["currency_name"] = dim_currency["currency_code"].map(mappings) - return dim_currency - - -def create_dim_date(dict_of_df): - df_sales = dict_of_df["sales"] - df_sales = df_sales.loc[:, ["agreed_delivery_date"]] - df_sales["agreed_delivery_date"] = pd.to_datetime["agreed_delivery_date"] - df_sales["year"] = df_sales["agreed_delivery_date"].dt.year - df_sales["month"] = df_sales["agreed_delivery_date"].dt.month - df_sales["day"] = df_sales["agreed_delivery_date"].dt.day - df_sales["day_of_week"] = df_sales["agreed_delivery_date"].dt.dayofweek - df_sales["day_name"] = df_sales["agreed_delivery_date"].dt.day_name() - df_sales["month_name"] = df_sales["agreed_delivery_date"].dt.month_name() - df_sales["quarter"] = df_sales["agreed_delivery_date"].dt.quarter() - dim_date = ["date_id", "year", "month", "day", "day_of_week", "day_name", "month_name", "quarter"] #series.dt.quarter() - return dim_date - -def create_fact_sales_order(dict_of_df): - df_sales = dict_of_df["sales_order"] - df_sales.index.name = "sales_record_id" - df_sales["created_date"] = pd.to_datetime(df_sales["created_at"]).dt.date - df_sales["created_time"] = pd.to_datetime(df_sales["created_at"]).dt.time - df_sales["last_updated_date"] = pd.to_datetime(df_sales["last_updated"]).dt.date - df_sales["last_updated_time"] = pd.to_datetime(df_sales["last_updated"]).dt.time - pd.merge(dict_of_df["staff"], df_sales["sales_staff_id"], on="staff_id", how="left") - # df_sales.rename(columns={"staff_id": "sales_staff_id"}) - fact_sales_order = df_sales.loc[:,[ - "sales_record_id", - "sales_order_id", - "created_date", - "created_time", - "last_updated_date", - "last_updated_time", - "sales_staff_id", - "counterparty_id", - "units_sold", - "unit_price", - "currency_id", - "design_id", - "agreed_payment_date", - "agreed_delivery_date", - "agreed_delivery_location_id" - ]] - return fact_sales_order - -# TO DO: -# complete dim_date from merged fact table -# merge dataframes into one dataframe -# remove duplicates -# test dim_date and fact_sales_order - -def create_sales_star_schema(dict_of_df): - dim_design = create_dim_design(dict_of_df) - dim_staff = create_dim_staff(dict_of_df) - dim_currency = create_dim_currency(dict_of_df) - dim_date = create_dim_date(dict_of_df) - - fact_sales_order = create_fact_sales_order(dict_of_df) - - fact_sales_order = fact_sales_order.merge(dim_design, on='design_id', how='left') - fact_sales_order = fact_sales_order.merge(dim_staff, left_on='sales_staff_id', right_on='staff_id', how='left') - fact_sales_order = fact_sales_order.merge(dim_currency, on='currency_id', how='left') - fact_sales_order = fact_sales_order.merge(dim_date, left_on='agreed_delivery_date', right_on='date_id', how='left') - - return fact_sales_order - - - - diff --git a/src/transform_lambda.py b/src/transform_lambda.py index 6024a24..d30d91d 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -1,13 +1,35 @@ import json import boto3 import re +import logging import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -from src.extract_lambda import extract_bucket -from src.fact_purchase_table import * -from src.fact_sales_order import create_dim_staff, create_dim_design, create_fact_sales_order +from src.dataframes import * +# from src.extract_lambda import extract_bucket, DBConnectionException +import boto3 +from botocore.exceptions import ClientError +from pg8000.native import Connection, InterfaceError +from datetime import datetime + +class DBConnectionException(Exception): + """Wraps pg8000.native Error or DatabaseError.""" + + def __init__(self, e): + """Initialise with provided error message.""" + self.message = str(e) + super().__init__(self.message) + +logger = logging.getLogger(__name__) +logging.basicConfig( + format="{asctime} - {levelname} - {message}", + style="{", + datefmt="%Y-%m-%d %H:%M", + level=logging.DEBUG, +) + +logging.getLogger("botocore").setLevel(logging.WARNING) tables = [ "sales_order", @@ -24,47 +46,124 @@ tables = [ ] def lambda_handler(event, context): - dict_of_df = read_from_s3_subfolder_to_df(tables, extract_bucket(), client=boto3.client("s3")) - common_df_list = [create_dim_counterparty(dict_of_df), - create_dim_date(dict_of_df), - create_dim_location(dict_of_df), - create_dim_currency(dict_of_df), - create_dim_staff(dict_of_df)] + db = None - create_fact_purchase_order() + try: + db = connect_to_database() + bucket = bucket_name('transform') + existing_s3_files = list_existing_s3_files(bucket) - f_sales_list = [create_fact_sales_order(), - create_dim_design()] - - - ''' - #dict{ - sales_schema: { - Table_name: df_value, - ...} - payment_schema: - Table_name: df_value, - ...} - purchase_schema: - Table_name: df_value, - ...} - } - - for schema in dict: - for table_name, df_value in schema.items(): - parquet_file = df_value.to_parquet(f'{table_name}.parquet', engine='pyarrow'/'fastparquet'(?)) #we don't know the engine - - s3_key = datetime.strftime( - datetime.today(), f"{schema}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet" - ) - - client.upload_file( - parquet_file, transform_bucket(), s3_key) - ##might need seperate function for easier testing## - ''' + dict_of_df = read_from_s3_subfolder_to_df(tables, extract_bucket(), client=boto3.client("s3")) + + immutable_df_dict = { + 'dim_counterparty': create_dim_counterparty(dict_of_df), + 'dim_date': create_dim_date(dict_of_df), + 'dim_location': create_dim_location(dict_of_df), + 'dim_staff': create_dim_staff(dict_of_df), + 'dim_design': create_dim_design(dict_of_df)} + + + mutable_df_dict = { + 'fact_sales_order': create_fact_sales_order(dict_of_df), + 'fact_purchase_order': create_fact_purchase_orders(dict_of_df), + 'fact_payment': create_fact_payment(dict_of_df), + 'dim_currency': create_dim_currency(dict_of_df)} + + status = process_to_parquet_and_upload_to_s3( + existing_s3_files, + immutable_df_dict, + mutable_df_dict, + bucket + ) + + if not status['uploaded']: + logger.info("No dataframes written to the bucket.") + return { + 'statusCode': 204, + "body": json.dumps("No files where uploaded."), + } + + return { + "statusCode": 200, + "body": json.dumps( + f"""Parquet files processed for {', '.join(status['uploaded'])} and uploaded successfully.{ + 'The following tables were not uploaded: '+', '.join([status['not_uploaded']]) if status['not_uploaded'] else ''}""" + ), + } + + except Exception as e: + logger.error(f"Error: {e}", exc_info=True) + return {"statusCode": 500, "body": json.dumps("Internal server error.")} + finally: + if db: + db.close() + + +def process_to_parquet_and_upload_to_s3(existing_s3_files, + immutable_df_dict, + mutable_df_dict, + bucket, + client=boto3.client('s3')): + status = {'uploaded': [], + 'not_uploaded': []} + + for table_name, df in immutable_df_dict.items(): + if table_name in existing_s3_files: + status['not_uploaded'].append(table_name) + else: + parquet_file = df.to_parquet(f'{table_name}.parquet', engine='pyarrow') #or fastparquet + client.upload_file(parquet_file, bucket, f'{table_name}.parquet') + status['uploaded'].append(table_name) + + for table_name, df in mutable_df_dict.items(): + s3_key = datetime.strftime( + datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet") + parquet_file = df.to_parquet(f'{table_name}.parquet', engine='pyarrow') #or fastparquet + client.upload_file(parquet_file, bucket, s3_key) + status['uploaded'].append(table_name) + + + return status +def retrieve_secrets(): + secret_name = "bentley-secrets" + region_name = "eu-west-2" + + # Create a Secrets Manager client + session = boto3.session.Session() + client = session.client(service_name="secretsmanager", region_name=region_name) + + try: + get_secret_value_response = client.get_secret_value(SecretId=secret_name) + except ClientError as e: + logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}") + raise e + except KeyError: + logger.error(f"Secret {secret_name} does not contain a SecretString") + raise ValueError(f"Secret {secret_name} does not contain a SecretString") + + return get_secret_value_response["SecretString"] + + +def connect_to_database() -> Connection: + try: + secrets = json.loads(retrieve_secrets()) + host = secrets["host"] + port = secrets["port"] + user = secrets["user"] + password = secrets["password"] + database = secrets["database"] + + return Connection( + database=database, user=user, password=password, host=host, port=port + ) + except InterfaceError as i: + logger.error(f"Interface error: {i}") + raise DBConnectionException("Failed to connect to database") + + def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): table_dfs = {} for table in tables: @@ -76,10 +175,27 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): table_dfs[table] = pd.concat(list_of_df) return table_dfs -def transform_bucket(client=boto3.client("s3")): +def bucket_name(bucket_prefix, client=boto3.client("s3")): response = client.list_buckets() bucket_filter = [ - bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"] + bucket["Name"] for bucket in response["Buckets"] if bucket_prefix in bucket["Name"] ] return bucket_filter[0] + +def list_existing_s3_files(bucket_name, client=boto3.client("s3")): + logging.info("Listing existing S3 files") + + try: + response = client.list_objects_v2(Bucket=bucket_name) + + if "Contents" in response: + existing_files = [obj["Key"] for obj in response["Contents"]] + else: + logger.error("The bucket is empty") + return None + + except ClientError as e: + logger.error(f"Error listing S3 objects: {e}") + + return existing_files \ No newline at end of file -- cgit v1.2.3