diff options
| author | Ang Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local> | 2024-08-28 22:46:00 +0100 |
|---|---|---|
| committer | Ang Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local> | 2024-08-28 22:46:00 +0100 |
| commit | d396cd95d660fb76188ef887fc268d20aeeb5352 (patch) | |
| tree | 52fced39ff487d4053a3d43c24eb8bcf403c1719 /src | |
| parent | 6235a2bb04b60d57a41196b07bbf0296920c6980 (diff) | |
| download | de-project-bentley-d396cd95d660fb76188ef887fc268d20aeeb5352.tar.gz de-project-bentley-d396cd95d660fb76188ef887fc268d20aeeb5352.zip | |
fix: adds missing dataframes and resolves tables upload to end data warehouse in case the table is empty
Diffstat (limited to 'src')
| -rw-r--r-- | src/load_lambda.py | 24 | ||||
| -rw-r--r-- | src/transform_lambda/dataframes.py | 19 | ||||
| -rw-r--r-- | src/transform_lambda/transform_lambda.py | 4 |
3 files changed, 30 insertions, 17 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py index cdcf105..8f921b8 100644 --- a/src/load_lambda.py +++ b/src/load_lambda.py @@ -161,18 +161,15 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None): else: continue immutables_l = list(set(immutables_l)) - print(mutables_d,'mutables_d') latest_s3_keys = [] for k,v in mutables_d.items(): latest_s3_keys.append(dt.strftime(get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet")) - print(latest_s3_keys,'latest') - print(immutables_l,'immutables_l') - for file_key in latest_s3_keys+immutables_l: + for file_key in immutables_l+latest_s3_keys: try: file_obj = client.get_object(Bucket=bucket_name, Key=file_key) parquet_file = pq.ParquetFile(BytesIO(file_obj["Body"].read())) df = parquet_file.read().to_pandas() - df_without_nulls = df.dropna() + df_without_nulls = df.dropna(how='all') #>> can't do 'any' (default) because we lose rows in dim_location #print("df_without_nulls", df_without_nulls) #print("type", type(df_without_nulls)) #print(df_without_nulls.columns) @@ -202,12 +199,14 @@ def upload_dfs_to_database(): # "dim_date.parquet", # this needs to be mutable # "dim_location.parquet", # "dim_staff.parquet", - # "dim_design.parquet" + # "dim_design.parquet", + # 'dim_transaction.parquet' #This one was missing, + 'dim_payment_type.parquet' ] mutable_df_dict = [ - "dim_currency", - "fact_sales_order", - "fact_purchase_order", + # "dim_currency", + # "fact_sales_order", + # "fact_purchase_order", "fact_payment" ] @@ -215,7 +214,9 @@ def upload_dfs_to_database(): for file_name, df in dict_of_dfs.items(): print(df.dtypes, "dtypes") print(df.head()) - if file_name in immutable_df_dict: + print(file_name,"<<< FILE NAME") + print(immutable_df_dict,"<<<IMMUTABLE_DF_DICT") + if file_name in immutable_df_dict: table_name = file_name.split(".")[0] print(table_name, "<<<<<") try: @@ -248,7 +249,8 @@ def upload_dfs_to_database(): raise else: upload_status["not_uploaded"].append(file_name) - logger.error(f"{file_name} does not correspond with table in database", exc_info=True) + logger.error(f"{file_name} does not correspond with table in database", exc_info=True) + print(upload_status) db_engine.dispose() return upload_status diff --git a/src/transform_lambda/dataframes.py b/src/transform_lambda/dataframes.py index e89a6b2..c823b87 100644 --- a/src/transform_lambda/dataframes.py +++ b/src/transform_lambda/dataframes.py @@ -19,7 +19,6 @@ import requests # no test, same as fact_payment def create_fact_sales_order(dict_of_df): df_sales = dict_of_df["sales_order"].rename(columns={"staff_id": "sales_staff_id"}) - df_sales.index.name = "sales_record_id" df_sales["created_date"] = df_sales["created_at"].astype("datetime64[ns]").dt.date df_sales["created_time"] = ( @@ -55,9 +54,11 @@ def create_fact_sales_order(dict_of_df): "agreed_delivery_location_id" ], ] + fact_sales.convert_dtypes() + fact_sales.index = pd.RangeIndex(1, len(fact_sales.index) + 1) + fact_sales.index.name = "sales_record_id" fact_sales.reset_index(inplace=True) - - + fact_sales.dropna(inplace=True) return fact_sales @@ -66,7 +67,6 @@ def create_fact_sales_order(dict_of_df): def create_fact_purchase_orders(dict_of_df): df_po = dict_of_df["purchase_order"] - df_po.index.name = "purchase_record_id" df_po["created_date"] = df_po["created_at"].astype("datetime64[ns]").dt.date df_po["created_time"] = ( df_po["created_at"].astype("datetime64[ns]").dt.floor("s").dt.time @@ -100,7 +100,11 @@ def create_fact_purchase_orders(dict_of_df): ] ] + fact_purchase_order.convert_dtypes() + fact_purchase_order.index = pd.RangeIndex(1, len(fact_purchase_order.index) + 1) + fact_purchase_order.index.name = "purchase_record_id" fact_purchase_order.reset_index(inplace=True) + fact_purchase_order.dropna(inplace=True) return fact_purchase_order @@ -109,7 +113,6 @@ def create_fact_purchase_orders(dict_of_df): def create_fact_payment(dict_of_df): df_payment = dict_of_df["payment"] - df_payment.index.name = "payment_record_id" df_payment["created_date"] = ( df_payment["created_at"].astype("datetime64[ns]").dt.date ) @@ -141,7 +144,12 @@ def create_fact_payment(dict_of_df): "payment_date" ] ] + fact_payment.convert_dtypes() + fact_payment.index = pd.RangeIndex(1, len(fact_payment.index) + 1) + fact_payment.index.name = "payment_record_id" fact_payment.reset_index(inplace=True) + fact_payment.dropna(inplace=True) + fact_payment = fact_payment.astype({'currency_id':'int','payment_id':'int'}) return fact_payment @@ -157,6 +165,7 @@ def create_dim_transaction(dict_of_df): "purchase_order_id" ] ] + #dim_transaction = dim_transaction.astype({"sales_order_id":"Int64","purchase_order_id":"Int64"}) return dim_transaction diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py index 0b5748b..5ea8cf0 100644 --- a/src/transform_lambda/transform_lambda.py +++ b/src/transform_lambda/transform_lambda.py @@ -65,13 +65,15 @@ def lambda_handler(event, context): "dim_location": create_dim_location(dict_of_df), "dim_staff": create_dim_staff(dict_of_df), "dim_design": create_dim_design(dict_of_df), + "dim_transaction": create_dim_transaction(dict_of_df), + "dim_payment_type": create_dim_payment_type(dict_of_df) } mutable_df_dict = { "fact_sales_order": create_fact_sales_order(dict_of_df), "fact_purchase_order": create_fact_purchase_orders(dict_of_df), "fact_payment": create_fact_payment(dict_of_df), - "dim_currency": create_dim_currency(dict_of_df), + "dim_currency": create_dim_currency(dict_of_df) } print(immutable_df_dict.values()) print(mutable_df_dict.values()) |
