diff options
| author | deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> | 2024-08-29 08:57:48 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-08-29 08:57:48 +0000 |
| commit | 42ad135b25044bb1c7ab8a553f038c8da9de0f75 (patch) | |
| tree | cb765be2b57404e3e93c18e95bd41c427d08a918 /src/load_lambda.py | |
| parent | 48e7daec8b5435a696fe572fd51dcbc8f9604a2d (diff) | |
| download | de-project-bentley-42ad135b25044bb1c7ab8a553f038c8da9de0f75.tar.gz de-project-bentley-42ad135b25044bb1c7ab8a553f038c8da9de0f75.zip | |
style: format code with Autopep8, Black and Ruff Formatter
This commit fixes the style issues introduced in 48e7dae according to the output
from Autopep8, Black and Ruff Formatter.
Details: https://github.com/ajschofield/de-project-bentley/pull/107
Diffstat (limited to 'src/load_lambda.py')
| -rw-r--r-- | src/load_lambda.py | 78 |
1 files changed, 49 insertions, 29 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py index 941ae97..86189dc 100644 --- a/src/load_lambda.py +++ b/src/load_lambda.py @@ -58,10 +58,14 @@ def retrieve_secrets(client=None, secret_name=None): try: get_secret_value_response = client.get_secret_value(SecretId=secret_name) except ClientError as e: - logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}", exc_info=True) + logger.error( + f"Failed to retrieve secret {secret_name}: {str(e)}", exc_info=True + ) raise e except KeyError: - logger.error(f"Secret {secret_name} does not contain a SecretString", exc_info=True) + logger.error( + f"Secret {secret_name} does not contain a SecretString", exc_info=True + ) raise ValueError(f"Secret {secret_name} does not contain a SecretString") return get_secret_value_response["SecretString"] @@ -117,6 +121,7 @@ def get_transform_bucket(client=None): # convert parquet files into dataframes # return a dictionary of dataframes with name as key, and dataframe object as value + def get_latest_timestamp(existing_files): if existing_files: all_datetimes = [] @@ -124,19 +129,17 @@ def get_latest_timestamp(existing_files): match = re.search(r"\/(.+/).+_(.+)\.parquet", file_name) if match: datetime_str = "".join(match.group(1, 2)) - all_datetimes.append( - dt.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S") - ) + all_datetimes.append(dt.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")) return max(all_datetimes) if all_datetimes else dt.min return existing_files + def convert_parquet_files_to_dfs(bucket_name=None, client=None): mutable_df_dict = [ "dim_currency", "fact_sales_order", "fact_purchase_order", - "fact_payment" - + "fact_payment", ] try: @@ -145,12 +148,12 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None): if bucket_name is None: bucket_name = get_transform_bucket() files = client.list_objects_v2(Bucket=bucket_name) - + dfs = {} if "Contents" in files: - s3_key_list = [file["Key"]for file in files["Contents"]] + s3_key_list = [file["Key"] for file in files["Contents"]] immutables_l = [] - mutables_d = {prefix:[] for prefix in mutable_df_dict} + mutables_d = {prefix: [] for prefix in mutable_df_dict} for tab, s3_key in mutables_d.items(): for file in s3_key_list: if tab in file: @@ -161,22 +164,31 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None): continue immutables_l = list(set(immutables_l)) latest_s3_keys = [] - for k,v in mutables_d.items(): - latest_s3_keys.append(dt.strftime(get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet")) - for file_key in immutables_l+latest_s3_keys: + for k, v in mutables_d.items(): + latest_s3_keys.append( + dt.strftime( + get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet" + ) + ) + for file_key in immutables_l + latest_s3_keys: try: file_obj = client.get_object(Bucket=bucket_name, Key=file_key) parquet_file = pq.ParquetFile(BytesIO(file_obj["Body"].read())) df = parquet_file.read().to_pandas() - df_without_nulls = df.dropna(how='all') #>> can't do 'any' (default) because we lose rows in dim_location - #print("df_without_nulls", df_without_nulls) - #print("type", type(df_without_nulls)) - #print(df_without_nulls.columns) + # >> can't do 'any' (default) because we lose rows in dim_location + df_without_nulls = df.dropna(how="all") + # print("df_without_nulls", df_without_nulls) + # print("type", type(df_without_nulls)) + # print(df_without_nulls.columns) dfs[file_key] = df_without_nulls except ClientError as e: - logger.error(f"Unable to retrieve S3 object {file_key}: {e}", exc_info=True) + logger.error( + f"Unable to retrieve S3 object {file_key}: {e}", exc_info=True + ) except Exception as e: - logger.error(f"Unable to process file {file_key}: {e}", exc_info=True) + logger.error( + f"Unable to process file {file_key}: {e}", exc_info=True + ) else: logger.error(f"No files found in {bucket_name}.", exc_info=True) return {} @@ -199,23 +211,22 @@ def upload_dfs_to_database(): "dim_location.parquet", "dim_staff.parquet", "dim_design.parquet", - 'dim_transaction.parquet', #This one was missing, - 'dim_payment_type.parquet' + "dim_transaction.parquet", # This one was missing, + "dim_payment_type.parquet", ] mutable_df_dict = [ "dim_currency", "fact_sales_order", "fact_purchase_order", - "fact_payment" - + "fact_payment", ] with db_engine.begin() as connection: for file_name, df in dict_of_dfs.items(): print(df.dtypes, "dtypes") print(df.head()) - print(file_name,"<<< FILE NAME") - print(immutable_df_dict,"<<<IMMUTABLE_DF_DICT") - if file_name in immutable_df_dict: + print(file_name, "<<< FILE NAME") + print(immutable_df_dict, "<<<IMMUTABLE_DF_DICT") + if file_name in immutable_df_dict: table_name = file_name.split(".")[0] print(table_name, "<<<<<") try: @@ -229,7 +240,10 @@ def upload_dfs_to_database(): upload_status["uploaded"].append(table_name) print(upload_status) except Exception as e: - logger.error(f"Error uploading dataframe {file_name} to database: {e}", exc_info=True) + logger.error( + f"Error uploading dataframe {file_name} to database: {e}", + exc_info=True, + ) raise elif file_name.split("/")[0] in mutable_df_dict: table_name = file_name.split("/")[0] @@ -244,11 +258,17 @@ def upload_dfs_to_database(): ) upload_status["uploaded"].append(table_name) except Exception as e: - logger.error(f"Error uploading dataframe {file_name} to database: {e}", exc_info=True) + logger.error( + f"Error uploading dataframe {file_name} to database: {e}", + exc_info=True, + ) raise else: upload_status["not_uploaded"].append(file_name) - logger.error(f"{file_name} does not correspond with table in database", exc_info=True) + logger.error( + f"{file_name} does not correspond with table in database", + exc_info=True, + ) print(upload_status) db_engine.dispose() return upload_status |
