aboutsummaryrefslogtreecommitdiffstats
path: root/src/load_lambda.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/load_lambda.py')
-rw-r--r--src/load_lambda.py280
1 files changed, 278 insertions, 2 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py
index c6a8e60..86189dc 100644
--- a/src/load_lambda.py
+++ b/src/load_lambda.py
@@ -1,2 +1,278 @@
-def lambda_handler():
- pass
+import boto3
+from botocore.exceptions import ClientError
+import pandas as pd
+import pyarrow.parquet as pq
+from io import BytesIO
+import logging
+import json
+import traceback
+from sqlalchemy import create_engine
+from datetime import datetime as dt
+import re
+
+logger = logging.getLogger(__name__)
+
+logging.basicConfig(
+ format="{asctime} - {levelname} - {message}",
+ style="{",
+ datefmt="%Y-%m-%d %H:%M",
+ level=logging.INFO,
+)
+# logging.getLogger("botocore").setLevel(logging.INFO)
+# logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
+
+
+def lambda_handler(event, context):
+ try:
+ uploaded_tables = upload_dfs_to_database()
+ if uploaded_tables["not_uploaded"]:
+ return {
+ "statusCode": 200,
+ "body": json.dumps("No dataframes were uploaded."),
+ }
+ elif uploaded_tables["uploaded"]:
+ return {
+ "statusCode": 200,
+ "body": json.dumps(
+ f"""The following dataframes were uploaded successfully:
+ {uploaded_tables["uploaded"]} ."""
+ ),
+ }
+ else:
+ logger.error(f"error", exc_info=True)
+ return {"error"}
+ except Exception as e:
+ logger.error({e}, exc_info=True)
+ return {"statusCode": 500, "body": {e}}
+
+
+def retrieve_secrets(client=None, secret_name=None):
+ session = boto3.session.Session()
+ region_name = "eu-west-2"
+
+ if secret_name == None:
+ secret_name = "bentley-RDS-credentials"
+ if client == None:
+ client = session.client(service_name="secretsmanager", region_name=region_name)
+
+ try:
+ get_secret_value_response = client.get_secret_value(SecretId=secret_name)
+ except ClientError as e:
+ logger.error(
+ f"Failed to retrieve secret {secret_name}: {str(e)}", exc_info=True
+ )
+ raise e
+ except KeyError:
+ logger.error(
+ f"Secret {secret_name} does not contain a SecretString", exc_info=True
+ )
+ raise ValueError(f"Secret {secret_name} does not contain a SecretString")
+
+ return get_secret_value_response["SecretString"]
+
+
+# connect to database, slightly different way of doing it, to allow manipulation through pandas
+
+
+def connect_to_db_and_return_engine(sm_secret=None):
+ if sm_secret is None:
+ sm_secret = json.loads(retrieve_secrets())
+
+ try:
+ secrets = sm_secret
+ host = secrets["host"]
+ port = secrets["port"]
+ user = secrets["user"]
+ password = secrets["password"]
+ database = secrets["database"]
+ conn_str = f"postgresql+pg8000://{user}:{password}@{host}:{port}/{database}"
+ # interface between python (pandas) and SQL
+ engine = create_engine(conn_str)
+ return engine
+ except Exception as e:
+ logger.error(f"Interface error: {e}", exc_info=True)
+ raise RuntimeError("Failed to create database engine")
+
+
+# get transform bucket
+def get_transform_bucket(client=None):
+ if client is None:
+ client = boto3.client("s3")
+ try:
+ response = client.list_buckets()
+ except ClientError as e:
+ logger.error(f"Error listing S3 buckets: {e}", exc_info=True)
+ raise RuntimeError("Error listing S3 buckets")
+
+ transform_bucket_filter = [
+ bucket["Name"]
+ for bucket in response["Buckets"]
+ if "transform" in bucket["Name"]
+ ]
+
+ if not transform_bucket_filter:
+ logger.error("No transform bucket found", exc_info=True)
+ raise ValueError("No transform bucket found")
+
+ return transform_bucket_filter[0]
+
+
+# list and then retrieve parquet files from S3 bucket
+# convert parquet files into dataframes
+# return a dictionary of dataframes with name as key, and dataframe object as value
+
+
+def get_latest_timestamp(existing_files):
+ if existing_files:
+ all_datetimes = []
+ for file_name in existing_files:
+ match = re.search(r"\/(.+/).+_(.+)\.parquet", file_name)
+ if match:
+ datetime_str = "".join(match.group(1, 2))
+ all_datetimes.append(dt.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S"))
+ return max(all_datetimes) if all_datetimes else dt.min
+ return existing_files
+
+
+def convert_parquet_files_to_dfs(bucket_name=None, client=None):
+ mutable_df_dict = [
+ "dim_currency",
+ "fact_sales_order",
+ "fact_purchase_order",
+ "fact_payment",
+ ]
+
+ try:
+ if client is None:
+ client = boto3.client("s3")
+ if bucket_name is None:
+ bucket_name = get_transform_bucket()
+ files = client.list_objects_v2(Bucket=bucket_name)
+
+ dfs = {}
+ if "Contents" in files:
+ s3_key_list = [file["Key"] for file in files["Contents"]]
+ immutables_l = []
+ mutables_d = {prefix: [] for prefix in mutable_df_dict}
+ for tab, s3_key in mutables_d.items():
+ for file in s3_key_list:
+ if tab in file:
+ s3_key.append(file)
+ elif "2024" not in file:
+ immutables_l.append(file)
+ else:
+ continue
+ immutables_l = list(set(immutables_l))
+ latest_s3_keys = []
+ for k, v in mutables_d.items():
+ latest_s3_keys.append(
+ dt.strftime(
+ get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet"
+ )
+ )
+ for file_key in immutables_l + latest_s3_keys:
+ try:
+ file_obj = client.get_object(Bucket=bucket_name, Key=file_key)
+ parquet_file = pq.ParquetFile(BytesIO(file_obj["Body"].read()))
+ df = parquet_file.read().to_pandas()
+ # >> can't do 'any' (default) because we lose rows in dim_location
+ df_without_nulls = df.dropna(how="all")
+ # print("df_without_nulls", df_without_nulls)
+ # print("type", type(df_without_nulls))
+ # print(df_without_nulls.columns)
+ dfs[file_key] = df_without_nulls
+ except ClientError as e:
+ logger.error(
+ f"Unable to retrieve S3 object {file_key}: {e}", exc_info=True
+ )
+ except Exception as e:
+ logger.error(
+ f"Unable to process file {file_key}: {e}", exc_info=True
+ )
+ else:
+ logger.error(f"No files found in {bucket_name}.", exc_info=True)
+ return {}
+ except ValueError as value_error:
+ logger.error(f"Unable to list objects: {value_error}", exc_info=True)
+ raise
+ except ClientError as client_error:
+ logger.error(f"Unable to list objects: {client_error}", exc_info=True)
+ raise
+ return dfs
+
+
+def upload_dfs_to_database():
+ upload_status = {"uploaded": [], "not_uploaded": []}
+ dict_of_dfs = convert_parquet_files_to_dfs()
+ db_engine = connect_to_db_and_return_engine()
+ immutable_df_dict = [
+ "dim_counterparty.parquet",
+ "dim_date.parquet", # this needs to be mutable
+ "dim_location.parquet",
+ "dim_staff.parquet",
+ "dim_design.parquet",
+ "dim_transaction.parquet", # This one was missing,
+ "dim_payment_type.parquet",
+ ]
+ mutable_df_dict = [
+ "dim_currency",
+ "fact_sales_order",
+ "fact_purchase_order",
+ "fact_payment",
+ ]
+ with db_engine.begin() as connection:
+ for file_name, df in dict_of_dfs.items():
+ print(df.dtypes, "dtypes")
+ print(df.head())
+ print(file_name, "<<< FILE NAME")
+ print(immutable_df_dict, "<<<IMMUTABLE_DF_DICT")
+ if file_name in immutable_df_dict:
+ table_name = file_name.split(".")[0]
+ print(table_name, "<<<<<")
+ try:
+ df.to_sql(
+ table_name,
+ con=connection,
+ schema="project_team_2",
+ if_exists="append",
+ index=False,
+ )
+ upload_status["uploaded"].append(table_name)
+ print(upload_status)
+ except Exception as e:
+ logger.error(
+ f"Error uploading dataframe {file_name} to database: {e}",
+ exc_info=True,
+ )
+ raise
+ elif file_name.split("/")[0] in mutable_df_dict:
+ table_name = file_name.split("/")[0]
+ print(table_name, "<<<<<<<TABLE NAME")
+ try:
+ df.to_sql(
+ table_name,
+ con=connection,
+ schema="project_team_2",
+ if_exists="append",
+ index=False,
+ )
+ upload_status["uploaded"].append(table_name)
+ except Exception as e:
+ logger.error(
+ f"Error uploading dataframe {file_name} to database: {e}",
+ exc_info=True,
+ )
+ raise
+ else:
+ upload_status["not_uploaded"].append(file_name)
+ logger.error(
+ f"{file_name} does not correspond with table in database",
+ exc_info=True,
+ )
+ print(upload_status)
+ db_engine.dispose()
+ return upload_status
+
+
+if __name__ == "__main__":
+ lambda_handler(None, None)
git.ajschof.me — hosted by ajschofield — powered by cgit