diff options
| author | Ellie <ecsymonds@gmail.com> | 2024-08-23 17:04:08 +0100 |
|---|---|---|
| committer | Ellie <ecsymonds@gmail.com> | 2024-08-23 17:04:08 +0100 |
| commit | 500ebf24c746ec87c9c846f5a82d638cc23983b9 (patch) | |
| tree | 30beb7e2cdef53ba0e8dc3acc24dd80be8841f91 /src | |
| parent | 0f8f376fe806ea72f056356cc043213f61159697 (diff) | |
| download | de-project-bentley-500ebf24c746ec87c9c846f5a82d638cc23983b9.tar.gz de-project-bentley-500ebf24c746ec87c9c846f5a82d638cc23983b9.zip | |
add amendendments for upload_dfs_to_db
Diffstat (limited to 'src')
| -rw-r--r-- | src/load_lambda.py | 47 |
1 files changed, 34 insertions, 13 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py index 2dc90ba..8eaea32 100644 --- a/src/load_lambda.py +++ b/src/load_lambda.py @@ -24,7 +24,7 @@ logging.getLogger("botocore").setLevel(logging.INFO) def lambda_handler(event, context): try: uploaded_tables = upload_dfs_to_database() - if not uploaded_tables: + if not uploaded_tables["uploaded"]: return { "statusCode": 200, "body": json.dumps("No dataframes were uploaded."), @@ -33,7 +33,7 @@ def lambda_handler(event, context): "statusCode": 200, "body": json.dumps( f"""The following dataframes were uploaded successfully: - {', '.join(uploaded_tables)} .""" + {uploaded_tables["uploaded"]} .""" ), } except Exception as e: @@ -133,17 +133,38 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None): return dfs def upload_dfs_to_database(): - uploaded = [] + upload_status = {"uploaded": [], "not_uploaded": []} dict_of_dfs = convert_parquet_files_to_dfs() db_engine = connect_to_db_and_return_engine() - try: - for table_name, df in dict_of_dfs.items(): - df.to_sql(table_name, con=db_engine, if_exists="replace", index=False) - uploaded.append(table_name) - except Exception as e: - logger.error(f"Error uploading dataframes: {e}") - raise + immutable_df_dict = ["dim_counterparty.parquet", + "dim_date.parquet", #this needs to be mutable + "dim_location.parquet", + "dim_staff.parquet", + "dim_design.parquet"] + mutable_df_dict = ["fact_sales_order", + "fact_purchase_order", + "fact_payment", + "dim_currency"] + + for file_name, df in dict_of_dfs.items(): + if file_name in immutable_df_dict: + table_name = file_name.split(".")[0] + try: + df.to_sql(table_name, con=db_engine, schema="project_team_2", if_exists="overwrite", index=False) + upload_status["uploaded"].append(table_name) + except Exception as e: + logger.error(f"Error uploading dataframe {file_name} to database: {e}") + raise + elif file_name.rsplit('_', 1)[0] in mutable_df_dict: + table_name = file_name.rsplit('_', 1)[0] + try: + df.to_sql(table_name, con=db_engine, schema="project_team_2", if_exists="overwrite", index=False) + upload_status["uploaded"].append(table_name) + except Exception as e: + logger.error(f"Error uploading dataframe {file_name} to database: {e}") + raise + else: + upload_status["not_uploaded"].append(file_name) + logger.error(f"{file_name} does not correspond with table in database") db_engine.dispose() - return uploaded - - # aiming to return a list of uploaded tables
\ No newline at end of file + return upload_status
\ No newline at end of file |
