diff options
| author | deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> | 2024-08-29 08:57:48 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-08-29 08:57:48 +0000 |
| commit | 42ad135b25044bb1c7ab8a553f038c8da9de0f75 (patch) | |
| tree | cb765be2b57404e3e93c18e95bd41c427d08a918 /src | |
| parent | 48e7daec8b5435a696fe572fd51dcbc8f9604a2d (diff) | |
| download | de-project-bentley-42ad135b25044bb1c7ab8a553f038c8da9de0f75.tar.gz de-project-bentley-42ad135b25044bb1c7ab8a553f038c8da9de0f75.zip | |
style: format code with Autopep8, Black and Ruff Formatter
This commit fixes the style issues introduced in 48e7dae according to the output
from Autopep8, Black and Ruff Formatter.
Details: https://github.com/ajschofield/de-project-bentley/pull/107
Diffstat (limited to 'src')
| -rw-r--r-- | src/load_lambda.py | 78 | ||||
| -rw-r--r-- | src/transform_lambda/dataframes.py | 116 | ||||
| -rw-r--r-- | src/transform_lambda/transform_lambda.py | 6 |
3 files changed, 105 insertions, 95 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py index 941ae97..86189dc 100644 --- a/src/load_lambda.py +++ b/src/load_lambda.py @@ -58,10 +58,14 @@ def retrieve_secrets(client=None, secret_name=None): try: get_secret_value_response = client.get_secret_value(SecretId=secret_name) except ClientError as e: - logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}", exc_info=True) + logger.error( + f"Failed to retrieve secret {secret_name}: {str(e)}", exc_info=True + ) raise e except KeyError: - logger.error(f"Secret {secret_name} does not contain a SecretString", exc_info=True) + logger.error( + f"Secret {secret_name} does not contain a SecretString", exc_info=True + ) raise ValueError(f"Secret {secret_name} does not contain a SecretString") return get_secret_value_response["SecretString"] @@ -117,6 +121,7 @@ def get_transform_bucket(client=None): # convert parquet files into dataframes # return a dictionary of dataframes with name as key, and dataframe object as value + def get_latest_timestamp(existing_files): if existing_files: all_datetimes = [] @@ -124,19 +129,17 @@ def get_latest_timestamp(existing_files): match = re.search(r"\/(.+/).+_(.+)\.parquet", file_name) if match: datetime_str = "".join(match.group(1, 2)) - all_datetimes.append( - dt.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S") - ) + all_datetimes.append(dt.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")) return max(all_datetimes) if all_datetimes else dt.min return existing_files + def convert_parquet_files_to_dfs(bucket_name=None, client=None): mutable_df_dict = [ "dim_currency", "fact_sales_order", "fact_purchase_order", - "fact_payment" - + "fact_payment", ] try: @@ -145,12 +148,12 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None): if bucket_name is None: bucket_name = get_transform_bucket() files = client.list_objects_v2(Bucket=bucket_name) - + dfs = {} if "Contents" in files: - s3_key_list = [file["Key"]for file in files["Contents"]] + s3_key_list = [file["Key"] for file in files["Contents"]] immutables_l = [] - mutables_d = {prefix:[] for prefix in mutable_df_dict} + mutables_d = {prefix: [] for prefix in mutable_df_dict} for tab, s3_key in mutables_d.items(): for file in s3_key_list: if tab in file: @@ -161,22 +164,31 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None): continue immutables_l = list(set(immutables_l)) latest_s3_keys = [] - for k,v in mutables_d.items(): - latest_s3_keys.append(dt.strftime(get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet")) - for file_key in immutables_l+latest_s3_keys: + for k, v in mutables_d.items(): + latest_s3_keys.append( + dt.strftime( + get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet" + ) + ) + for file_key in immutables_l + latest_s3_keys: try: file_obj = client.get_object(Bucket=bucket_name, Key=file_key) parquet_file = pq.ParquetFile(BytesIO(file_obj["Body"].read())) df = parquet_file.read().to_pandas() - df_without_nulls = df.dropna(how='all') #>> can't do 'any' (default) because we lose rows in dim_location - #print("df_without_nulls", df_without_nulls) - #print("type", type(df_without_nulls)) - #print(df_without_nulls.columns) + # >> can't do 'any' (default) because we lose rows in dim_location + df_without_nulls = df.dropna(how="all") + # print("df_without_nulls", df_without_nulls) + # print("type", type(df_without_nulls)) + # print(df_without_nulls.columns) dfs[file_key] = df_without_nulls except ClientError as e: - logger.error(f"Unable to retrieve S3 object {file_key}: {e}", exc_info=True) + logger.error( + f"Unable to retrieve S3 object {file_key}: {e}", exc_info=True + ) except Exception as e: - logger.error(f"Unable to process file {file_key}: {e}", exc_info=True) + logger.error( + f"Unable to process file {file_key}: {e}", exc_info=True + ) else: logger.error(f"No files found in {bucket_name}.", exc_info=True) return {} @@ -199,23 +211,22 @@ def upload_dfs_to_database(): "dim_location.parquet", "dim_staff.parquet", "dim_design.parquet", - 'dim_transaction.parquet', #This one was missing, - 'dim_payment_type.parquet' + "dim_transaction.parquet", # This one was missing, + "dim_payment_type.parquet", ] mutable_df_dict = [ "dim_currency", "fact_sales_order", "fact_purchase_order", - "fact_payment" - + "fact_payment", ] with db_engine.begin() as connection: for file_name, df in dict_of_dfs.items(): print(df.dtypes, "dtypes") print(df.head()) - print(file_name,"<<< FILE NAME") - print(immutable_df_dict,"<<<IMMUTABLE_DF_DICT") - if file_name in immutable_df_dict: + print(file_name, "<<< FILE NAME") + print(immutable_df_dict, "<<<IMMUTABLE_DF_DICT") + if file_name in immutable_df_dict: table_name = file_name.split(".")[0] print(table_name, "<<<<<") try: @@ -229,7 +240,10 @@ def upload_dfs_to_database(): upload_status["uploaded"].append(table_name) print(upload_status) except Exception as e: - logger.error(f"Error uploading dataframe {file_name} to database: {e}", exc_info=True) + logger.error( + f"Error uploading dataframe {file_name} to database: {e}", + exc_info=True, + ) raise elif file_name.split("/")[0] in mutable_df_dict: table_name = file_name.split("/")[0] @@ -244,11 +258,17 @@ def upload_dfs_to_database(): ) upload_status["uploaded"].append(table_name) except Exception as e: - logger.error(f"Error uploading dataframe {file_name} to database: {e}", exc_info=True) + logger.error( + f"Error uploading dataframe {file_name} to database: {e}", + exc_info=True, + ) raise else: upload_status["not_uploaded"].append(file_name) - logger.error(f"{file_name} does not correspond with table in database", exc_info=True) + logger.error( + f"{file_name} does not correspond with table in database", + exc_info=True, + ) print(upload_status) db_engine.dispose() return upload_status diff --git a/src/transform_lambda/dataframes.py b/src/transform_lambda/dataframes.py index c823b87..6de58e7 100644 --- a/src/transform_lambda/dataframes.py +++ b/src/transform_lambda/dataframes.py @@ -36,7 +36,8 @@ def create_fact_sales_order(dict_of_df): df_sales["agreed_payment_date"] = pd.to_datetime( df_sales["agreed_payment_date"], format="%Y-%m-%d" ) - fact_sales = df_sales.loc[:, + fact_sales = df_sales.loc[ + :, [ "sales_order_id", "created_date", @@ -51,7 +52,7 @@ def create_fact_sales_order(dict_of_df): "design_id", "agreed_payment_date", "agreed_delivery_date", - "agreed_delivery_location_id" + "agreed_delivery_location_id", ], ] fact_sales.convert_dtypes() @@ -81,24 +82,24 @@ def create_fact_purchase_orders(dict_of_df): df_po["agreed_payment_date"] = pd.to_datetime( df_po["agreed_payment_date"], format="%Y-%m-%d" ) - fact_purchase_order = df_po.loc[:, - [ - "purchase_order_id", - "created_date", - "created_time", - "last_updated_date", - "last_updated_time", - "staff_id", - "counterparty_id", - "item_code", - "item_quantity", - "item_unit_price", - "currency_id", - "agreed_delivery_date", - "agreed_payment_date", - "agreed_delivery_location_id" - ] - + fact_purchase_order = df_po.loc[ + :, + [ + "purchase_order_id", + "created_date", + "created_time", + "last_updated_date", + "last_updated_time", + "staff_id", + "counterparty_id", + "item_code", + "item_quantity", + "item_unit_price", + "currency_id", + "agreed_delivery_date", + "agreed_payment_date", + "agreed_delivery_location_id", + ], ] fact_purchase_order.convert_dtypes() fact_purchase_order.index = pd.RangeIndex(1, len(fact_purchase_order.index) + 1) @@ -128,28 +129,29 @@ def create_fact_payment(dict_of_df): df_payment["payment_date"] = pd.to_datetime( df_payment["payment_date"], format="%Y-%m-%d" ) - fact_payment = df_payment.loc[:, + fact_payment = df_payment.loc[ + :, [ - "payment_id", - "created_date", - "created_time", - "last_updated_date", - "last_updated_time", - "transaction_id", - "counterparty_id", - "payment_amount", - "currency_id", - "payment_type_id", - "paid", - "payment_date" - ] + "payment_id", + "created_date", + "created_time", + "last_updated_date", + "last_updated_time", + "transaction_id", + "counterparty_id", + "payment_amount", + "currency_id", + "payment_type_id", + "paid", + "payment_date", + ], ] fact_payment.convert_dtypes() fact_payment.index = pd.RangeIndex(1, len(fact_payment.index) + 1) fact_payment.index.name = "payment_record_id" fact_payment.reset_index(inplace=True) fact_payment.dropna(inplace=True) - fact_payment = fact_payment.astype({'currency_id':'int','payment_id':'int'}) + fact_payment = fact_payment.astype({"currency_id": "int", "payment_id": "int"}) return fact_payment @@ -157,15 +159,10 @@ def create_fact_payment(dict_of_df): def create_dim_transaction(dict_of_df): - dim_transaction = dict_of_df["transaction"].loc[:, - [ - "transaction_id", - "transaction_type", - "sales_order_id", - "purchase_order_id" - ] + dim_transaction = dict_of_df["transaction"].loc[ + :, ["transaction_id", "transaction_type", "sales_order_id", "purchase_order_id"] ] - #dim_transaction = dim_transaction.astype({"sales_order_id":"Int64","purchase_order_id":"Int64"}) + # dim_transaction = dim_transaction.astype({"sales_order_id":"Int64","purchase_order_id":"Int64"}) return dim_transaction @@ -174,7 +171,8 @@ def create_dim_transaction(dict_of_df): def create_dim_location(dict_of_df): dim_location = ( - dict_of_df["address"].drop(labels=["created_at", "last_updated"], axis=1) + dict_of_df["address"] + .drop(labels=["created_at", "last_updated"], axis=1) .rename(columns={"address_id": "location_id"}) ) return dim_location @@ -193,7 +191,7 @@ def create_dim_counterparty(dict_of_df): left_on="legal_address_id", right_on="counterparty_legal_address_id", how="inner", - )#.dropna(inplace=True) + ) # .dropna(inplace=True) dim_counterparty = df_cp.drop( labels=[ "legal_address_id", @@ -201,8 +199,9 @@ def create_dim_counterparty(dict_of_df): "created_at", "last_updated", "commercial_contact", - "delivery_contact" - ], axis=1 + "delivery_contact", + ], + axis=1, ) return dim_counterparty @@ -272,12 +271,7 @@ def create_dim_currency(dict_of_df, names=scrape_currency_names()): def create_dim_payment_type(dict_of_df): df_payment_type = dict_of_df["payment_type"] - dim_payment_type = df_payment_type.loc[:, - [ - "payment_type_id", - "payment_type_name" - ] - ] + dim_payment_type = df_payment_type.loc[:, ["payment_type_id", "payment_type_name"]] return dim_payment_type @@ -286,13 +280,8 @@ def create_dim_payment_type(dict_of_df): def create_dim_design(dict_of_df): df_design = dict_of_df["design"] - dim_design = df_design.loc[:, - [ - "design_id", - "design_name", - "file_name", - "file_location" - ] + dim_design = df_design.loc[ + :, ["design_id", "design_name", "file_name", "file_location"] ] return dim_design @@ -304,14 +293,15 @@ def create_dim_staff(dict_of_df): staff_department = pd.merge( dict_of_df["staff"], dict_of_df["department"], on="department_id", how="left" ) - dim_staff = staff_department.loc[:, + dim_staff = staff_department.loc[ + :, [ "staff_id", "first_name", "last_name", "department_name", "location", - "email_address" - ] + "email_address", + ], ] return dim_staff diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py index 5ea8cf0..2739997 100644 --- a/src/transform_lambda/transform_lambda.py +++ b/src/transform_lambda/transform_lambda.py @@ -42,7 +42,7 @@ TABLES = [ "department", "currency", "design", - "payment_type" + "payment_type", ] @@ -66,14 +66,14 @@ def lambda_handler(event, context): "dim_staff": create_dim_staff(dict_of_df), "dim_design": create_dim_design(dict_of_df), "dim_transaction": create_dim_transaction(dict_of_df), - "dim_payment_type": create_dim_payment_type(dict_of_df) + "dim_payment_type": create_dim_payment_type(dict_of_df), } mutable_df_dict = { "fact_sales_order": create_fact_sales_order(dict_of_df), "fact_purchase_order": create_fact_purchase_orders(dict_of_df), "fact_payment": create_fact_payment(dict_of_df), - "dim_currency": create_dim_currency(dict_of_df) + "dim_currency": create_dim_currency(dict_of_df), } print(immutable_df_dict.values()) print(mutable_df_dict.values()) |
