From 69edb14dad584d45fa6a83a90c08292b84795507 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Fri, 23 Aug 2024 16:11:45 +0000 Subject: style: format code with Autopep8, Black and Ruff Formatter This commit fixes the style issues introduced in 0ff2956 according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/95 --- src/load_lambda.py | 75 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 24 deletions(-) (limited to 'src') diff --git a/src/load_lambda.py b/src/load_lambda.py index 8eaea32..6e6bc80 100644 --- a/src/load_lambda.py +++ b/src/load_lambda.py @@ -40,6 +40,7 @@ def lambda_handler(event, context): logger.error(f"Error: {e}", exc_info=True) return {"statusCode": 500, "body": json.dumps("Internal server error.")} + def retrieve_secrets(): secret_name = "bentley-RDS-credentials" region_name = "eu-west-2" @@ -59,7 +60,10 @@ def retrieve_secrets(): return get_secret_value_response["SecretString"] + # connect to database, slightly different way of doing it, to allow manipulation through pandas + + def connect_to_db_and_return_engine(): try: secrets = json.loads(retrieve_secrets()) @@ -68,13 +72,14 @@ def connect_to_db_and_return_engine(): user = secrets["user"] password = secrets["password"] database = secrets["database"] - conn_str = f'postgresql+pg8000://{user}:{password}@{host}:{port}/{database}' - engine = create_engine(conn_str) #interface between python (pandas) and SQL + conn_str = f"postgresql+pg8000://{user}:{password}@{host}:{port}/{database}" + # interface between python (pandas) and SQL + engine = create_engine(conn_str) return engine except Exception as e: logger.error(f"Interface error: {e}") raise RuntimeError("Failed to create database engine") - + # get transform bucket def get_transform_bucket(client=None): @@ -85,9 +90,11 @@ def get_transform_bucket(client=None): except ClientError as e: logger.error(f"Error listing S3 buckets: {e}") raise RuntimeError("Error listing S3 buckets") - + transform_bucket_filter = [ - bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"] + bucket["Name"] + for bucket in response["Buckets"] + if "transform" in bucket["Name"] ] if not transform_bucket_filter: @@ -96,9 +103,12 @@ def get_transform_bucket(client=None): return transform_bucket_filter[0] + # list and then retrieve parquet files from S3 bucket # convert parquet files into dataframes -# return a dictionary of dataframes with name as key, and dataframe object as value +# return a dictionary of dataframes with name as key, and dataframe object as value + + def convert_parquet_files_to_dfs(bucket_name=None, client=None): try: if client is None: @@ -110,10 +120,10 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None): dfs = {} if "Contents" in files: for file in files["Contents"]: - file_key = file['Key'] + file_key = file["Key"] try: file_obj = client.get_object(Bucket=bucket_name, Key=file_key) - parquet_file = pq.ParquetFile(BytesIO(file_obj['Body'].read())) + parquet_file = pq.ParquetFile(BytesIO(file_obj["Body"].read())) df = parquet_file.read().to_pandas() dfs[file_key] = df except ClientError as e: @@ -132,34 +142,51 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None): return dfs + def upload_dfs_to_database(): upload_status = {"uploaded": [], "not_uploaded": []} dict_of_dfs = convert_parquet_files_to_dfs() db_engine = connect_to_db_and_return_engine() - immutable_df_dict = ["dim_counterparty.parquet", - "dim_date.parquet", #this needs to be mutable - "dim_location.parquet", - "dim_staff.parquet", - "dim_design.parquet"] - mutable_df_dict = ["fact_sales_order", - "fact_purchase_order", - "fact_payment", - "dim_currency"] - + immutable_df_dict = [ + "dim_counterparty.parquet", + "dim_date.parquet", # this needs to be mutable + "dim_location.parquet", + "dim_staff.parquet", + "dim_design.parquet", + ] + mutable_df_dict = [ + "fact_sales_order", + "fact_purchase_order", + "fact_payment", + "dim_currency", + ] + for file_name, df in dict_of_dfs.items(): if file_name in immutable_df_dict: table_name = file_name.split(".")[0] try: - df.to_sql(table_name, con=db_engine, schema="project_team_2", if_exists="overwrite", index=False) + df.to_sql( + table_name, + con=db_engine, + schema="project_team_2", + if_exists="overwrite", + index=False, + ) upload_status["uploaded"].append(table_name) except Exception as e: logger.error(f"Error uploading dataframe {file_name} to database: {e}") raise - elif file_name.rsplit('_', 1)[0] in mutable_df_dict: - table_name = file_name.rsplit('_', 1)[0] + elif file_name.rsplit("_", 1)[0] in mutable_df_dict: + table_name = file_name.rsplit("_", 1)[0] try: - df.to_sql(table_name, con=db_engine, schema="project_team_2", if_exists="overwrite", index=False) - upload_status["uploaded"].append(table_name) + df.to_sql( + table_name, + con=db_engine, + schema="project_team_2", + if_exists="overwrite", + index=False, + ) + upload_status["uploaded"].append(table_name) except Exception as e: logger.error(f"Error uploading dataframe {file_name} to database: {e}") raise @@ -167,4 +194,4 @@ def upload_dfs_to_database(): upload_status["not_uploaded"].append(file_name) logger.error(f"{file_name} does not correspond with table in database") db_engine.dispose() - return upload_status \ No newline at end of file + return upload_status -- cgit v1.2.3