aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAng Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local>2024-08-28 22:46:00 +0100
committerAng Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local>2024-08-28 22:46:00 +0100
commitd396cd95d660fb76188ef887fc268d20aeeb5352 (patch)
tree52fced39ff487d4053a3d43c24eb8bcf403c1719
parent6235a2bb04b60d57a41196b07bbf0296920c6980 (diff)
downloadde-project-bentley-d396cd95d660fb76188ef887fc268d20aeeb5352.tar.gz
de-project-bentley-d396cd95d660fb76188ef887fc268d20aeeb5352.zip
fix: adds missing dataframes and resolves tables upload to end data warehouse in case the table is empty
-rw-r--r--.gitignore6
-rw-r--r--src/load_lambda.py24
-rw-r--r--src/transform_lambda/dataframes.py19
-rw-r--r--src/transform_lambda/transform_lambda.py4
4 files changed, 35 insertions, 18 deletions
diff --git a/.gitignore b/.gitignore
index 6aa03fc..480ae4b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,4 +14,8 @@ __pycache__/
# OS-Related Files
.DS_Store
-venv \ No newline at end of file
+venv
+
+#files
+/dim_*
+/fact_* \ No newline at end of file
diff --git a/src/load_lambda.py b/src/load_lambda.py
index cdcf105..8f921b8 100644
--- a/src/load_lambda.py
+++ b/src/load_lambda.py
@@ -161,18 +161,15 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None):
else:
continue
immutables_l = list(set(immutables_l))
- print(mutables_d,'mutables_d')
latest_s3_keys = []
for k,v in mutables_d.items():
latest_s3_keys.append(dt.strftime(get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet"))
- print(latest_s3_keys,'latest')
- print(immutables_l,'immutables_l')
- for file_key in latest_s3_keys+immutables_l:
+ for file_key in immutables_l+latest_s3_keys:
try:
file_obj = client.get_object(Bucket=bucket_name, Key=file_key)
parquet_file = pq.ParquetFile(BytesIO(file_obj["Body"].read()))
df = parquet_file.read().to_pandas()
- df_without_nulls = df.dropna()
+ df_without_nulls = df.dropna(how='all') #>> can't do 'any' (default) because we lose rows in dim_location
#print("df_without_nulls", df_without_nulls)
#print("type", type(df_without_nulls))
#print(df_without_nulls.columns)
@@ -202,12 +199,14 @@ def upload_dfs_to_database():
# "dim_date.parquet", # this needs to be mutable
# "dim_location.parquet",
# "dim_staff.parquet",
- # "dim_design.parquet"
+ # "dim_design.parquet",
+ # 'dim_transaction.parquet' #This one was missing,
+ 'dim_payment_type.parquet'
]
mutable_df_dict = [
- "dim_currency",
- "fact_sales_order",
- "fact_purchase_order",
+ # "dim_currency",
+ # "fact_sales_order",
+ # "fact_purchase_order",
"fact_payment"
]
@@ -215,7 +214,9 @@ def upload_dfs_to_database():
for file_name, df in dict_of_dfs.items():
print(df.dtypes, "dtypes")
print(df.head())
- if file_name in immutable_df_dict:
+ print(file_name,"<<< FILE NAME")
+ print(immutable_df_dict,"<<<IMMUTABLE_DF_DICT")
+ if file_name in immutable_df_dict:
table_name = file_name.split(".")[0]
print(table_name, "<<<<<")
try:
@@ -248,7 +249,8 @@ def upload_dfs_to_database():
raise
else:
upload_status["not_uploaded"].append(file_name)
- logger.error(f"{file_name} does not correspond with table in database", exc_info=True)
+ logger.error(f"{file_name} does not correspond with table in database", exc_info=True)
+ print(upload_status)
db_engine.dispose()
return upload_status
diff --git a/src/transform_lambda/dataframes.py b/src/transform_lambda/dataframes.py
index e89a6b2..c823b87 100644
--- a/src/transform_lambda/dataframes.py
+++ b/src/transform_lambda/dataframes.py
@@ -19,7 +19,6 @@ import requests
# no test, same as fact_payment
def create_fact_sales_order(dict_of_df):
df_sales = dict_of_df["sales_order"].rename(columns={"staff_id": "sales_staff_id"})
- df_sales.index.name = "sales_record_id"
df_sales["created_date"] = df_sales["created_at"].astype("datetime64[ns]").dt.date
df_sales["created_time"] = (
@@ -55,9 +54,11 @@ def create_fact_sales_order(dict_of_df):
"agreed_delivery_location_id"
],
]
+ fact_sales.convert_dtypes()
+ fact_sales.index = pd.RangeIndex(1, len(fact_sales.index) + 1)
+ fact_sales.index.name = "sales_record_id"
fact_sales.reset_index(inplace=True)
-
-
+ fact_sales.dropna(inplace=True)
return fact_sales
@@ -66,7 +67,6 @@ def create_fact_sales_order(dict_of_df):
def create_fact_purchase_orders(dict_of_df):
df_po = dict_of_df["purchase_order"]
- df_po.index.name = "purchase_record_id"
df_po["created_date"] = df_po["created_at"].astype("datetime64[ns]").dt.date
df_po["created_time"] = (
df_po["created_at"].astype("datetime64[ns]").dt.floor("s").dt.time
@@ -100,7 +100,11 @@ def create_fact_purchase_orders(dict_of_df):
]
]
+ fact_purchase_order.convert_dtypes()
+ fact_purchase_order.index = pd.RangeIndex(1, len(fact_purchase_order.index) + 1)
+ fact_purchase_order.index.name = "purchase_record_id"
fact_purchase_order.reset_index(inplace=True)
+ fact_purchase_order.dropna(inplace=True)
return fact_purchase_order
@@ -109,7 +113,6 @@ def create_fact_purchase_orders(dict_of_df):
def create_fact_payment(dict_of_df):
df_payment = dict_of_df["payment"]
- df_payment.index.name = "payment_record_id"
df_payment["created_date"] = (
df_payment["created_at"].astype("datetime64[ns]").dt.date
)
@@ -141,7 +144,12 @@ def create_fact_payment(dict_of_df):
"payment_date"
]
]
+ fact_payment.convert_dtypes()
+ fact_payment.index = pd.RangeIndex(1, len(fact_payment.index) + 1)
+ fact_payment.index.name = "payment_record_id"
fact_payment.reset_index(inplace=True)
+ fact_payment.dropna(inplace=True)
+ fact_payment = fact_payment.astype({'currency_id':'int','payment_id':'int'})
return fact_payment
@@ -157,6 +165,7 @@ def create_dim_transaction(dict_of_df):
"purchase_order_id"
]
]
+ #dim_transaction = dim_transaction.astype({"sales_order_id":"Int64","purchase_order_id":"Int64"})
return dim_transaction
diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py
index 0b5748b..5ea8cf0 100644
--- a/src/transform_lambda/transform_lambda.py
+++ b/src/transform_lambda/transform_lambda.py
@@ -65,13 +65,15 @@ def lambda_handler(event, context):
"dim_location": create_dim_location(dict_of_df),
"dim_staff": create_dim_staff(dict_of_df),
"dim_design": create_dim_design(dict_of_df),
+ "dim_transaction": create_dim_transaction(dict_of_df),
+ "dim_payment_type": create_dim_payment_type(dict_of_df)
}
mutable_df_dict = {
"fact_sales_order": create_fact_sales_order(dict_of_df),
"fact_purchase_order": create_fact_purchase_orders(dict_of_df),
"fact_payment": create_fact_payment(dict_of_df),
- "dim_currency": create_dim_currency(dict_of_df),
+ "dim_currency": create_dim_currency(dict_of_df)
}
print(immutable_df_dict.values())
print(mutable_df_dict.values())
git.ajschof.me — hosted by ajschofield — powered by cgit