aboutsummaryrefslogtreecommitdiffstats
path: root/src/load_lambda.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/load_lambda.py')
-rw-r--r--src/load_lambda.py47
1 files changed, 34 insertions, 13 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py
index 2dc90ba..8eaea32 100644
--- a/src/load_lambda.py
+++ b/src/load_lambda.py
@@ -24,7 +24,7 @@ logging.getLogger("botocore").setLevel(logging.INFO)
def lambda_handler(event, context):
try:
uploaded_tables = upload_dfs_to_database()
- if not uploaded_tables:
+ if not uploaded_tables["uploaded"]:
return {
"statusCode": 200,
"body": json.dumps("No dataframes were uploaded."),
@@ -33,7 +33,7 @@ def lambda_handler(event, context):
"statusCode": 200,
"body": json.dumps(
f"""The following dataframes were uploaded successfully:
- {', '.join(uploaded_tables)} ."""
+ {uploaded_tables["uploaded"]} ."""
),
}
except Exception as e:
@@ -133,17 +133,38 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None):
return dfs
def upload_dfs_to_database():
- uploaded = []
+ upload_status = {"uploaded": [], "not_uploaded": []}
dict_of_dfs = convert_parquet_files_to_dfs()
db_engine = connect_to_db_and_return_engine()
- try:
- for table_name, df in dict_of_dfs.items():
- df.to_sql(table_name, con=db_engine, if_exists="replace", index=False)
- uploaded.append(table_name)
- except Exception as e:
- logger.error(f"Error uploading dataframes: {e}")
- raise
+ immutable_df_dict = ["dim_counterparty.parquet",
+ "dim_date.parquet", #this needs to be mutable
+ "dim_location.parquet",
+ "dim_staff.parquet",
+ "dim_design.parquet"]
+ mutable_df_dict = ["fact_sales_order",
+ "fact_purchase_order",
+ "fact_payment",
+ "dim_currency"]
+
+ for file_name, df in dict_of_dfs.items():
+ if file_name in immutable_df_dict:
+ table_name = file_name.split(".")[0]
+ try:
+ df.to_sql(table_name, con=db_engine, schema="project_team_2", if_exists="overwrite", index=False)
+ upload_status["uploaded"].append(table_name)
+ except Exception as e:
+ logger.error(f"Error uploading dataframe {file_name} to database: {e}")
+ raise
+ elif file_name.rsplit('_', 1)[0] in mutable_df_dict:
+ table_name = file_name.rsplit('_', 1)[0]
+ try:
+ df.to_sql(table_name, con=db_engine, schema="project_team_2", if_exists="overwrite", index=False)
+ upload_status["uploaded"].append(table_name)
+ except Exception as e:
+ logger.error(f"Error uploading dataframe {file_name} to database: {e}")
+ raise
+ else:
+ upload_status["not_uploaded"].append(file_name)
+ logger.error(f"{file_name} does not correspond with table in database")
db_engine.dispose()
- return uploaded
-
- # aiming to return a list of uploaded tables \ No newline at end of file
+ return upload_status \ No newline at end of file
git.ajschof.me — hosted by ajschofield — powered by cgit