diff options
| author | Alex <git@ajschof.me> | 2024-09-03 16:07:37 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-09-03 16:07:37 +0100 |
| commit | ce30178558cc8222e9975273eb5d08a93ae92fcc (patch) | |
| tree | 4152f9efe54364a5d6a6cc969befb6cea9015a5b /src/transform_lambda | |
| parent | e4e360630c90d7e801d99097b3e46e8299ab901d (diff) | |
| parent | 3b8e89968e3d3d3527ea76b4517b0d7278512530 (diff) | |
| download | de-project-bentley-ce30178558cc8222e9975273eb5d08a93ae92fcc.tar.gz de-project-bentley-ce30178558cc8222e9975273eb5d08a93ae92fcc.zip | |
Merge branch 'development' into test/tests_transform_lambda
Diffstat (limited to 'src/transform_lambda')
| -rw-r--r-- | src/transform_lambda/dataframes.py | 148 | ||||
| -rw-r--r-- | src/transform_lambda/transform_lambda.py | 5 |
2 files changed, 103 insertions, 50 deletions
diff --git a/src/transform_lambda/dataframes.py b/src/transform_lambda/dataframes.py index 2a46bd6..6de58e7 100644 --- a/src/transform_lambda/dataframes.py +++ b/src/transform_lambda/dataframes.py @@ -18,8 +18,7 @@ import requests # no test, same as fact_payment def create_fact_sales_order(dict_of_df): - df_sales = dict_of_df["sales_order"] - df_sales.index.name = "sales_record_id" + df_sales = dict_of_df["sales_order"].rename(columns={"staff_id": "sales_staff_id"}) df_sales["created_date"] = df_sales["created_at"].astype("datetime64[ns]").dt.date df_sales["created_time"] = ( @@ -37,30 +36,31 @@ def create_fact_sales_order(dict_of_df): df_sales["agreed_payment_date"] = pd.to_datetime( df_sales["agreed_payment_date"], format="%Y-%m-%d" ) - df_sales = df_sales.drop(labels=["created_at", "last_updated"], axis=1) - - df_sales.reset_index(inplace=True) - return df_sales - - df_sales["created_date"] = df_sales["created_at"].astype("datetime64[ns]").dt.date - df_sales["created_time"] = ( - df_sales["created_at"].astype("datetime64[ns]").dt.floor("s").dt.time - ) - df_sales["last_updated_date"] = ( - df_sales["last_updated"].astype("datetime64[ns]").dt.date - ) - df_sales["last_updated_time"] = ( - df_sales["last_updated"].astype("datetime64[ns]").dt.floor("s").dt.time - ) - df_sales["agreed_delivery_date"] = pd.to_datetime( - df_sales["agreed_delivery_date"], format="%Y-%m-%d" - ) - df_sales["agreed_payment_date"] = pd.to_datetime( - df_sales["agreed_payment_date"], format="%Y-%m-%d" - ) - df_sales = df_sales.drop(labels=["created_at", "last_updated"], axis=1) - df_sales.reset_index(inplace=True) - return df_sales + fact_sales = df_sales.loc[ + :, + [ + "sales_order_id", + "created_date", + "created_time", + "last_updated_date", + "last_updated_time", + "sales_staff_id", + "counterparty_id", + "units_sold", + "unit_price", + "currency_id", + "design_id", + "agreed_payment_date", + "agreed_delivery_date", + "agreed_delivery_location_id", + ], + ] + fact_sales.convert_dtypes() + fact_sales.index = pd.RangeIndex(1, len(fact_sales.index) + 1) + fact_sales.index.name = "sales_record_id" + fact_sales.reset_index(inplace=True) + fact_sales.dropna(inplace=True) + return fact_sales # no test, same as fact_payment @@ -68,7 +68,6 @@ def create_fact_sales_order(dict_of_df): def create_fact_purchase_orders(dict_of_df): df_po = dict_of_df["purchase_order"] - df_po.index.name = "purchase_record_id" df_po["created_date"] = df_po["created_at"].astype("datetime64[ns]").dt.date df_po["created_time"] = ( df_po["created_at"].astype("datetime64[ns]").dt.floor("s").dt.time @@ -83,9 +82,31 @@ def create_fact_purchase_orders(dict_of_df): df_po["agreed_payment_date"] = pd.to_datetime( df_po["agreed_payment_date"], format="%Y-%m-%d" ) - df_po = df_po.drop(labels=["created_at", "last_updated"], axis=1) - df_po.reset_index(inplace=True) - return df_po + fact_purchase_order = df_po.loc[ + :, + [ + "purchase_order_id", + "created_date", + "created_time", + "last_updated_date", + "last_updated_time", + "staff_id", + "counterparty_id", + "item_code", + "item_quantity", + "item_unit_price", + "currency_id", + "agreed_delivery_date", + "agreed_payment_date", + "agreed_delivery_location_id", + ], + ] + fact_purchase_order.convert_dtypes() + fact_purchase_order.index = pd.RangeIndex(1, len(fact_purchase_order.index) + 1) + fact_purchase_order.index.name = "purchase_record_id" + fact_purchase_order.reset_index(inplace=True) + fact_purchase_order.dropna(inplace=True) + return fact_purchase_order # test passed @@ -93,7 +114,6 @@ def create_fact_purchase_orders(dict_of_df): def create_fact_payment(dict_of_df): df_payment = dict_of_df["payment"] - df_payment.index.name = "payment_record_id" df_payment["created_date"] = ( df_payment["created_at"].astype("datetime64[ns]").dt.date ) @@ -109,38 +129,60 @@ def create_fact_payment(dict_of_df): df_payment["payment_date"] = pd.to_datetime( df_payment["payment_date"], format="%Y-%m-%d" ) - df_payment = df_payment.drop(labels=["created_at", "last_updated"], axis=1) - - df_payment.reset_index(inplace=True) - return df_payment + fact_payment = df_payment.loc[ + :, + [ + "payment_id", + "created_date", + "created_time", + "last_updated_date", + "last_updated_time", + "transaction_id", + "counterparty_id", + "payment_amount", + "currency_id", + "payment_type_id", + "paid", + "payment_date", + ], + ] + fact_payment.convert_dtypes() + fact_payment.index = pd.RangeIndex(1, len(fact_payment.index) + 1) + fact_payment.index.name = "payment_record_id" + fact_payment.reset_index(inplace=True) + fact_payment.dropna(inplace=True) + fact_payment = fact_payment.astype({"currency_id": "int", "payment_id": "int"}) + return fact_payment # test passed def create_dim_transaction(dict_of_df): - df_transaction = dict_of_df["transaction"].drop( - labels=["created_at", "last_updated"], axis=1 - ) - return df_transaction + dim_transaction = dict_of_df["transaction"].loc[ + :, ["transaction_id", "transaction_type", "sales_order_id", "purchase_order_id"] + ] + # dim_transaction = dim_transaction.astype({"sales_order_id":"Int64","purchase_order_id":"Int64"}) + return dim_transaction # test passed def create_dim_location(dict_of_df): - df_loc = ( + dim_location = ( dict_of_df["address"] .drop(labels=["created_at", "last_updated"], axis=1) .rename(columns={"address_id": "location_id"}) ) - return df_loc + return dim_location def create_dim_counterparty(dict_of_df): df_prefixed_address = ( dict_of_df["address"] .drop(labels=["created_at", "last_updated"], axis=1) + .rename(columns={"phone": "phone_number"}) .add_prefix("counterparty_legal_", axis=1) ) df_cp = pd.merge( @@ -149,15 +191,19 @@ def create_dim_counterparty(dict_of_df): left_on="legal_address_id", right_on="counterparty_legal_address_id", how="inner", - ) - df_cp.drop( - columns=[ + ) # .dropna(inplace=True) + dim_counterparty = df_cp.drop( + labels=[ "legal_address_id", "counterparty_legal_address_id", + "created_at", + "last_updated", + "commercial_contact", + "delivery_contact", ], - inplace=True, + axis=1, ) - return df_cp + return dim_counterparty # test passed @@ -179,6 +225,7 @@ def create_dim_date(dict_of_df): sr_date = pd.array(pd.concat(list_of_date_columns), dtype="datetime64[ns]") df_date = pd.DataFrame(data=sr_date, columns=["date_id"]) df_date.drop_duplicates(inplace=True) + # df_date.dropna(inplace=True) df_date["year"] = df_date["date_id"].dt.year df_date["month"] = df_date["date_id"].dt.month df_date["day"] = df_date["date_id"].dt.day @@ -210,10 +257,13 @@ def scrape_currency_names(): def create_dim_currency(dict_of_df, names=scrape_currency_names()): df_cur = dict_of_df["currency"].drop(labels=["created_at", "last_updated"], axis=1) - dim_cur = pd.merge( - df_cur, names, left_on="currency_code", right_on="currency_code", how="inner" + dim_currency = pd.merge( + df_cur, names, left_on="currency_code", right_on="currency_code", how="left" ) - return dim_cur + dim_currency.drop_duplicates(inplace=True) + dim_currency.astype({"currency_name": "string", "currency_code": "string"}) + print(dim_currency.dtypes, "<<<<<<<<<Dtype") + return dim_currency # tests passed diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py index 478b257..54d7d48 100644 --- a/src/transform_lambda/transform_lambda.py +++ b/src/transform_lambda/transform_lambda.py @@ -65,6 +65,8 @@ def lambda_handler(event, context): "dim_location": create_dim_location(dict_of_df), "dim_staff": create_dim_staff(dict_of_df), "dim_design": create_dim_design(dict_of_df), + "dim_transaction": create_dim_transaction(dict_of_df), + "dim_payment_type": create_dim_payment_type(dict_of_df), } mutable_df_dict = { @@ -73,7 +75,8 @@ def lambda_handler(event, context): "fact_payment": create_fact_payment(dict_of_df), "dim_currency": create_dim_currency(dict_of_df), } - + print(immutable_df_dict.values()) + print(mutable_df_dict.values()) status = process_to_parquet_and_upload_to_s3( existing_s3_files, immutable_df_dict, mutable_df_dict, bucket ) |
