aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authordeepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>2024-08-23 16:11:45 +0000
committerGitHub <noreply@github.com>2024-08-23 16:11:45 +0000
commit69edb14dad584d45fa6a83a90c08292b84795507 (patch)
tree099f687557ba2cc8c379014492e3c05421912edd /src
parent0ff29566a1eb9551bb83bcc07705c932d22f8c08 (diff)
downloadde-project-bentley-69edb14dad584d45fa6a83a90c08292b84795507.tar.gz
de-project-bentley-69edb14dad584d45fa6a83a90c08292b84795507.zip
style: format code with Autopep8, Black and Ruff Formatter
This commit fixes the style issues introduced in 0ff2956 according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/95
Diffstat (limited to 'src')
-rw-r--r--src/load_lambda.py75
1 files changed, 51 insertions, 24 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py
index 8eaea32..6e6bc80 100644
--- a/src/load_lambda.py
+++ b/src/load_lambda.py
@@ -40,6 +40,7 @@ def lambda_handler(event, context):
logger.error(f"Error: {e}", exc_info=True)
return {"statusCode": 500, "body": json.dumps("Internal server error.")}
+
def retrieve_secrets():
secret_name = "bentley-RDS-credentials"
region_name = "eu-west-2"
@@ -59,7 +60,10 @@ def retrieve_secrets():
return get_secret_value_response["SecretString"]
+
# connect to database, slightly different way of doing it, to allow manipulation through pandas
+
+
def connect_to_db_and_return_engine():
try:
secrets = json.loads(retrieve_secrets())
@@ -68,13 +72,14 @@ def connect_to_db_and_return_engine():
user = secrets["user"]
password = secrets["password"]
database = secrets["database"]
- conn_str = f'postgresql+pg8000://{user}:{password}@{host}:{port}/{database}'
- engine = create_engine(conn_str) #interface between python (pandas) and SQL
+ conn_str = f"postgresql+pg8000://{user}:{password}@{host}:{port}/{database}"
+ # interface between python (pandas) and SQL
+ engine = create_engine(conn_str)
return engine
except Exception as e:
logger.error(f"Interface error: {e}")
raise RuntimeError("Failed to create database engine")
-
+
# get transform bucket
def get_transform_bucket(client=None):
@@ -85,9 +90,11 @@ def get_transform_bucket(client=None):
except ClientError as e:
logger.error(f"Error listing S3 buckets: {e}")
raise RuntimeError("Error listing S3 buckets")
-
+
transform_bucket_filter = [
- bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"]
+ bucket["Name"]
+ for bucket in response["Buckets"]
+ if "transform" in bucket["Name"]
]
if not transform_bucket_filter:
@@ -96,9 +103,12 @@ def get_transform_bucket(client=None):
return transform_bucket_filter[0]
+
# list and then retrieve parquet files from S3 bucket
# convert parquet files into dataframes
-# return a dictionary of dataframes with name as key, and dataframe object as value
+# return a dictionary of dataframes with name as key, and dataframe object as value
+
+
def convert_parquet_files_to_dfs(bucket_name=None, client=None):
try:
if client is None:
@@ -110,10 +120,10 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None):
dfs = {}
if "Contents" in files:
for file in files["Contents"]:
- file_key = file['Key']
+ file_key = file["Key"]
try:
file_obj = client.get_object(Bucket=bucket_name, Key=file_key)
- parquet_file = pq.ParquetFile(BytesIO(file_obj['Body'].read()))
+ parquet_file = pq.ParquetFile(BytesIO(file_obj["Body"].read()))
df = parquet_file.read().to_pandas()
dfs[file_key] = df
except ClientError as e:
@@ -132,34 +142,51 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None):
return dfs
+
def upload_dfs_to_database():
upload_status = {"uploaded": [], "not_uploaded": []}
dict_of_dfs = convert_parquet_files_to_dfs()
db_engine = connect_to_db_and_return_engine()
- immutable_df_dict = ["dim_counterparty.parquet",
- "dim_date.parquet", #this needs to be mutable
- "dim_location.parquet",
- "dim_staff.parquet",
- "dim_design.parquet"]
- mutable_df_dict = ["fact_sales_order",
- "fact_purchase_order",
- "fact_payment",
- "dim_currency"]
-
+ immutable_df_dict = [
+ "dim_counterparty.parquet",
+ "dim_date.parquet", # this needs to be mutable
+ "dim_location.parquet",
+ "dim_staff.parquet",
+ "dim_design.parquet",
+ ]
+ mutable_df_dict = [
+ "fact_sales_order",
+ "fact_purchase_order",
+ "fact_payment",
+ "dim_currency",
+ ]
+
for file_name, df in dict_of_dfs.items():
if file_name in immutable_df_dict:
table_name = file_name.split(".")[0]
try:
- df.to_sql(table_name, con=db_engine, schema="project_team_2", if_exists="overwrite", index=False)
+ df.to_sql(
+ table_name,
+ con=db_engine,
+ schema="project_team_2",
+ if_exists="overwrite",
+ index=False,
+ )
upload_status["uploaded"].append(table_name)
except Exception as e:
logger.error(f"Error uploading dataframe {file_name} to database: {e}")
raise
- elif file_name.rsplit('_', 1)[0] in mutable_df_dict:
- table_name = file_name.rsplit('_', 1)[0]
+ elif file_name.rsplit("_", 1)[0] in mutable_df_dict:
+ table_name = file_name.rsplit("_", 1)[0]
try:
- df.to_sql(table_name, con=db_engine, schema="project_team_2", if_exists="overwrite", index=False)
- upload_status["uploaded"].append(table_name)
+ df.to_sql(
+ table_name,
+ con=db_engine,
+ schema="project_team_2",
+ if_exists="overwrite",
+ index=False,
+ )
+ upload_status["uploaded"].append(table_name)
except Exception as e:
logger.error(f"Error uploading dataframe {file_name} to database: {e}")
raise
@@ -167,4 +194,4 @@ def upload_dfs_to_database():
upload_status["not_uploaded"].append(file_name)
logger.error(f"{file_name} does not correspond with table in database")
db_engine.dispose()
- return upload_status \ No newline at end of file
+ return upload_status
git.ajschof.me — hosted by ajschofield — powered by cgit