diff options
Diffstat (limited to 'src/load_lambda.py')
| -rw-r--r-- | src/load_lambda.py | 174 |
1 files changed, 108 insertions, 66 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py index 272cb8c..cdcf105 100644 --- a/src/load_lambda.py +++ b/src/load_lambda.py @@ -7,7 +7,8 @@ import logging import json import traceback from sqlalchemy import create_engine - +from datetime import datetime as dt +import re logger = logging.getLogger(__name__) @@ -15,10 +16,10 @@ logging.basicConfig( format="{asctime} - {levelname} - {message}", style="{", datefmt="%Y-%m-%d %H:%M", - level=logging.DEBUG, + level=logging.INFO, ) - -logging.getLogger("botocore").setLevel(logging.INFO) +# logging.getLogger("botocore").setLevel(logging.INFO) +# logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG) def lambda_handler(event, context): @@ -38,10 +39,10 @@ def lambda_handler(event, context): ), } else: - logger.error(f"error") + logger.error(f"error", exc_info=True) return {"error"} except Exception as e: - logger.error({e}) + logger.error({e}, exc_info=True) return {"statusCode": 500, "body": {e}} @@ -58,10 +59,10 @@ def retrieve_secrets(client=None, secret_name=None): get_secret_value_response = client.get_secret_value(SecretId=secret_name) print(get_secret_value_response) except ClientError as e: - logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}") + logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}", exc_info=True) raise e except KeyError: - logger.error(f"Secret {secret_name} does not contain a SecretString") + logger.error(f"Secret {secret_name} does not contain a SecretString", exc_info=True) raise ValueError(f"Secret {secret_name} does not contain a SecretString") return get_secret_value_response["SecretString"] @@ -86,7 +87,7 @@ def connect_to_db_and_return_engine(sm_secret=None): engine = create_engine(conn_str) return engine except Exception as e: - logger.error(f"Interface error: {e}") + logger.error(f"Interface error: {e}", exc_info=True) raise RuntimeError("Failed to create database engine") @@ -97,7 +98,7 @@ def get_transform_bucket(client=None): try: response = client.list_buckets() except ClientError as e: - logger.error(f"Error listing S3 buckets: {e}") + logger.error(f"Error listing S3 buckets: {e}", exc_info=True) raise RuntimeError("Error listing S3 buckets") transform_bucket_filter = [ @@ -107,7 +108,7 @@ def get_transform_bucket(client=None): ] if not transform_bucket_filter: - logger.error("No transform bucket found") + logger.error("No transform bucket found", exc_info=True) raise ValueError("No transform bucket found") return transform_bucket_filter[0] @@ -117,41 +118,78 @@ def get_transform_bucket(client=None): # convert parquet files into dataframes # return a dictionary of dataframes with name as key, and dataframe object as value +def get_latest_timestamp(existing_files): + if existing_files: + all_datetimes = [] + for file_name in existing_files: + match = re.search(r"\/(.+/).+_(.+)\.parquet", file_name) + if match: + datetime_str = "".join(match.group(1, 2)) + all_datetimes.append( + dt.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S") + ) + return max(all_datetimes) if all_datetimes else dt.min + return existing_files def convert_parquet_files_to_dfs(bucket_name=None, client=None): + mutable_df_dict = [ + "dim_currency", + "fact_sales_order", + "fact_purchase_order", + "fact_payment" + + ] + try: if client is None: client = boto3.client("s3") if bucket_name is None: bucket_name = get_transform_bucket() files = client.list_objects_v2(Bucket=bucket_name) - + dfs = {} if "Contents" in files: - for file in files["Contents"]: - file_key = file["Key"] + s3_key_list = [file["Key"]for file in files["Contents"]] + immutables_l = [] + mutables_d = {prefix:[] for prefix in mutable_df_dict} + for tab, s3_key in mutables_d.items(): + for file in s3_key_list: + if tab in file: + s3_key.append(file) + elif "2024" not in file: + immutables_l.append(file) + else: + continue + immutables_l = list(set(immutables_l)) + print(mutables_d,'mutables_d') + latest_s3_keys = [] + for k,v in mutables_d.items(): + latest_s3_keys.append(dt.strftime(get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet")) + print(latest_s3_keys,'latest') + print(immutables_l,'immutables_l') + for file_key in latest_s3_keys+immutables_l: try: file_obj = client.get_object(Bucket=bucket_name, Key=file_key) parquet_file = pq.ParquetFile(BytesIO(file_obj["Body"].read())) df = parquet_file.read().to_pandas() - print("df", df) - print("type", type(df)) - print(df.columns) - dfs[file_key] = df + df_without_nulls = df.dropna() + #print("df_without_nulls", df_without_nulls) + #print("type", type(df_without_nulls)) + #print(df_without_nulls.columns) + dfs[file_key] = df_without_nulls except ClientError as e: - logger.error(f"Unable to retrieve S3 object {file_key}: {e}") + logger.error(f"Unable to retrieve S3 object {file_key}: {e}", exc_info=True) except Exception as e: - logger.error(f"Unable to process file {file_key}: {e}") + logger.error(f"Unable to process file {file_key}: {e}", exc_info=True) else: - logger.error(f"No files found in {bucket_name}.") + logger.error(f"No files found in {bucket_name}.", exc_info=True) return {} except ValueError as value_error: - logger.error(f"Unable to list objects: {value_error}") + logger.error(f"Unable to list objects: {value_error}", exc_info=True) raise except ClientError as client_error: - logger.error(f"Unable to list objects: {client_error}") + logger.error(f"Unable to list objects: {client_error}", exc_info=True) raise - print() return dfs @@ -160,53 +198,57 @@ def upload_dfs_to_database(): dict_of_dfs = convert_parquet_files_to_dfs() db_engine = connect_to_db_and_return_engine() immutable_df_dict = [ - "dim_counterparty.parquet", - "dim_date.parquet", # this needs to be mutable - "dim_location.parquet", - "dim_staff.parquet", - "dim_design.parquet" + # #"dim_counterparty.parquet", + # "dim_date.parquet", # this needs to be mutable + # "dim_location.parquet", + # "dim_staff.parquet", + # "dim_design.parquet" ] mutable_df_dict = [ + "dim_currency", "fact_sales_order", "fact_purchase_order", - "fact_payment", - "dim_currency" + "fact_payment" + ] - - for file_name, df in dict_of_dfs.items(): - print(df) - if file_name in immutable_df_dict: - table_name = file_name.split(".")[0] - print(table_name, "<<<<<") - try: - df.to_sql( - table_name, - con=db_engine, - schema="project_team_2", - if_exists="append", - index=False, - ) - upload_status["uploaded"].append(table_name) - except Exception as e: - logger.error(f"Error uploading dataframe {file_name} to database: {e}") - raise - elif file_name.rsplit("_", 1)[0] in mutable_df_dict: - table_name = file_name.rsplit("_", 1)[0] - try: - df.to_sql( - table_name, - con=db_engine, - schema="project_team_2", - if_exists="append", - index=False, - ) - upload_status["uploaded"].append(table_name) - except Exception as e: - logger.error(f"Error uploading dataframe {file_name} to database: {e}") - raise - else: - upload_status["not_uploaded"].append(file_name) - logger.error(f"{file_name} does not correspond with table in database") + with db_engine.begin() as connection: + for file_name, df in dict_of_dfs.items(): + print(df.dtypes, "dtypes") + print(df.head()) + if file_name in immutable_df_dict: + table_name = file_name.split(".")[0] + print(table_name, "<<<<<") + try: + df.to_sql( + table_name, + con=connection, + schema="project_team_2", + if_exists="append", + index=False, + ) + upload_status["uploaded"].append(table_name) + print(upload_status) + except Exception as e: + logger.error(f"Error uploading dataframe {file_name} to database: {e}", exc_info=True) + raise + elif file_name.split("/")[0] in mutable_df_dict: + table_name = file_name.split("/")[0] + print(table_name, "<<<<<<<TABLE NAME") + try: + df.to_sql( + table_name, + con=connection, + schema="project_team_2", + if_exists="append", + index=False, + ) + upload_status["uploaded"].append(table_name) + except Exception as e: + logger.error(f"Error uploading dataframe {file_name} to database: {e}", exc_info=True) + raise + else: + upload_status["not_uploaded"].append(file_name) + logger.error(f"{file_name} does not correspond with table in database", exc_info=True) db_engine.dispose() return upload_status |
