aboutsummaryrefslogtreecommitdiffstats
path: root/src/load_lambda.py
diff options
context:
space:
mode:
authordeepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>2024-08-29 08:57:48 +0000
committerGitHub <noreply@github.com>2024-08-29 08:57:48 +0000
commit42ad135b25044bb1c7ab8a553f038c8da9de0f75 (patch)
treecb765be2b57404e3e93c18e95bd41c427d08a918 /src/load_lambda.py
parent48e7daec8b5435a696fe572fd51dcbc8f9604a2d (diff)
downloadde-project-bentley-42ad135b25044bb1c7ab8a553f038c8da9de0f75.tar.gz
de-project-bentley-42ad135b25044bb1c7ab8a553f038c8da9de0f75.zip
style: format code with Autopep8, Black and Ruff Formatter
This commit fixes the style issues introduced in 48e7dae according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/107
Diffstat (limited to 'src/load_lambda.py')
-rw-r--r--src/load_lambda.py78
1 files changed, 49 insertions, 29 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py
index 941ae97..86189dc 100644
--- a/src/load_lambda.py
+++ b/src/load_lambda.py
@@ -58,10 +58,14 @@ def retrieve_secrets(client=None, secret_name=None):
try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
except ClientError as e:
- logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}", exc_info=True)
+ logger.error(
+ f"Failed to retrieve secret {secret_name}: {str(e)}", exc_info=True
+ )
raise e
except KeyError:
- logger.error(f"Secret {secret_name} does not contain a SecretString", exc_info=True)
+ logger.error(
+ f"Secret {secret_name} does not contain a SecretString", exc_info=True
+ )
raise ValueError(f"Secret {secret_name} does not contain a SecretString")
return get_secret_value_response["SecretString"]
@@ -117,6 +121,7 @@ def get_transform_bucket(client=None):
# convert parquet files into dataframes
# return a dictionary of dataframes with name as key, and dataframe object as value
+
def get_latest_timestamp(existing_files):
if existing_files:
all_datetimes = []
@@ -124,19 +129,17 @@ def get_latest_timestamp(existing_files):
match = re.search(r"\/(.+/).+_(.+)\.parquet", file_name)
if match:
datetime_str = "".join(match.group(1, 2))
- all_datetimes.append(
- dt.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")
- )
+ all_datetimes.append(dt.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S"))
return max(all_datetimes) if all_datetimes else dt.min
return existing_files
+
def convert_parquet_files_to_dfs(bucket_name=None, client=None):
mutable_df_dict = [
"dim_currency",
"fact_sales_order",
"fact_purchase_order",
- "fact_payment"
-
+ "fact_payment",
]
try:
@@ -145,12 +148,12 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None):
if bucket_name is None:
bucket_name = get_transform_bucket()
files = client.list_objects_v2(Bucket=bucket_name)
-
+
dfs = {}
if "Contents" in files:
- s3_key_list = [file["Key"]for file in files["Contents"]]
+ s3_key_list = [file["Key"] for file in files["Contents"]]
immutables_l = []
- mutables_d = {prefix:[] for prefix in mutable_df_dict}
+ mutables_d = {prefix: [] for prefix in mutable_df_dict}
for tab, s3_key in mutables_d.items():
for file in s3_key_list:
if tab in file:
@@ -161,22 +164,31 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None):
continue
immutables_l = list(set(immutables_l))
latest_s3_keys = []
- for k,v in mutables_d.items():
- latest_s3_keys.append(dt.strftime(get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet"))
- for file_key in immutables_l+latest_s3_keys:
+ for k, v in mutables_d.items():
+ latest_s3_keys.append(
+ dt.strftime(
+ get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet"
+ )
+ )
+ for file_key in immutables_l + latest_s3_keys:
try:
file_obj = client.get_object(Bucket=bucket_name, Key=file_key)
parquet_file = pq.ParquetFile(BytesIO(file_obj["Body"].read()))
df = parquet_file.read().to_pandas()
- df_without_nulls = df.dropna(how='all') #>> can't do 'any' (default) because we lose rows in dim_location
- #print("df_without_nulls", df_without_nulls)
- #print("type", type(df_without_nulls))
- #print(df_without_nulls.columns)
+ # >> can't do 'any' (default) because we lose rows in dim_location
+ df_without_nulls = df.dropna(how="all")
+ # print("df_without_nulls", df_without_nulls)
+ # print("type", type(df_without_nulls))
+ # print(df_without_nulls.columns)
dfs[file_key] = df_without_nulls
except ClientError as e:
- logger.error(f"Unable to retrieve S3 object {file_key}: {e}", exc_info=True)
+ logger.error(
+ f"Unable to retrieve S3 object {file_key}: {e}", exc_info=True
+ )
except Exception as e:
- logger.error(f"Unable to process file {file_key}: {e}", exc_info=True)
+ logger.error(
+ f"Unable to process file {file_key}: {e}", exc_info=True
+ )
else:
logger.error(f"No files found in {bucket_name}.", exc_info=True)
return {}
@@ -199,23 +211,22 @@ def upload_dfs_to_database():
"dim_location.parquet",
"dim_staff.parquet",
"dim_design.parquet",
- 'dim_transaction.parquet', #This one was missing,
- 'dim_payment_type.parquet'
+ "dim_transaction.parquet", # This one was missing,
+ "dim_payment_type.parquet",
]
mutable_df_dict = [
"dim_currency",
"fact_sales_order",
"fact_purchase_order",
- "fact_payment"
-
+ "fact_payment",
]
with db_engine.begin() as connection:
for file_name, df in dict_of_dfs.items():
print(df.dtypes, "dtypes")
print(df.head())
- print(file_name,"<<< FILE NAME")
- print(immutable_df_dict,"<<<IMMUTABLE_DF_DICT")
- if file_name in immutable_df_dict:
+ print(file_name, "<<< FILE NAME")
+ print(immutable_df_dict, "<<<IMMUTABLE_DF_DICT")
+ if file_name in immutable_df_dict:
table_name = file_name.split(".")[0]
print(table_name, "<<<<<")
try:
@@ -229,7 +240,10 @@ def upload_dfs_to_database():
upload_status["uploaded"].append(table_name)
print(upload_status)
except Exception as e:
- logger.error(f"Error uploading dataframe {file_name} to database: {e}", exc_info=True)
+ logger.error(
+ f"Error uploading dataframe {file_name} to database: {e}",
+ exc_info=True,
+ )
raise
elif file_name.split("/")[0] in mutable_df_dict:
table_name = file_name.split("/")[0]
@@ -244,11 +258,17 @@ def upload_dfs_to_database():
)
upload_status["uploaded"].append(table_name)
except Exception as e:
- logger.error(f"Error uploading dataframe {file_name} to database: {e}", exc_info=True)
+ logger.error(
+ f"Error uploading dataframe {file_name} to database: {e}",
+ exc_info=True,
+ )
raise
else:
upload_status["not_uploaded"].append(file_name)
- logger.error(f"{file_name} does not correspond with table in database", exc_info=True)
+ logger.error(
+ f"{file_name} does not correspond with table in database",
+ exc_info=True,
+ )
print(upload_status)
db_engine.dispose()
return upload_status
git.ajschof.me — hosted by ajschofield — powered by cgit