aboutsummaryrefslogtreecommitdiffstats
path: root/src/transform_lambda.py
diff options
context:
space:
mode:
authordeepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>2024-08-23 10:11:40 +0000
committerGitHub <noreply@github.com>2024-08-23 10:11:40 +0000
commit2231ea89329bd500f7371b7395f5208f7a86c20e (patch)
tree620c86177c81d3a17c0dccf16c2a6890729333e0 /src/transform_lambda.py
parent8e20c5c0f43d0f0c4983c8895396de7f62b7c390 (diff)
downloadde-project-bentley-2231ea89329bd500f7371b7395f5208f7a86c20e.tar.gz
de-project-bentley-2231ea89329bd500f7371b7395f5208f7a86c20e.zip
style: format code with Autopep8, Black and Ruff Formatter
This commit fixes the style issues introduced in 8e20c5c according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/93
Diffstat (limited to 'src/transform_lambda.py')
-rw-r--r--src/transform_lambda.py100
1 files changed, 56 insertions, 44 deletions
diff --git a/src/transform_lambda.py b/src/transform_lambda.py
index d30d91d..3e74ee0 100644
--- a/src/transform_lambda.py
+++ b/src/transform_lambda.py
@@ -6,12 +6,14 @@ import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from src.dataframes import *
+
# from src.extract_lambda import extract_bucket, DBConnectionException
import boto3
from botocore.exceptions import ClientError
from pg8000.native import Connection, InterfaceError
from datetime import datetime
+
class DBConnectionException(Exception):
"""Wraps pg8000.native Error or DatabaseError."""
@@ -20,6 +22,7 @@ class DBConnectionException(Exception):
self.message = str(e)
super().__init__(self.message)
+
logger = logging.getLogger(__name__)
logging.basicConfig(
@@ -45,44 +48,45 @@ tables = [
"payment_type",
]
+
def lambda_handler(event, context):
db = None
-
- try:
+
+ try:
db = connect_to_database()
- bucket = bucket_name('transform')
+ bucket = bucket_name("transform")
existing_s3_files = list_existing_s3_files(bucket)
- dict_of_df = read_from_s3_subfolder_to_df(tables, extract_bucket(), client=boto3.client("s3"))
+ dict_of_df = read_from_s3_subfolder_to_df(
+ tables, extract_bucket(), client=boto3.client("s3")
+ )
immutable_df_dict = {
- 'dim_counterparty': create_dim_counterparty(dict_of_df),
- 'dim_date': create_dim_date(dict_of_df),
- 'dim_location': create_dim_location(dict_of_df),
- 'dim_staff': create_dim_staff(dict_of_df),
- 'dim_design': create_dim_design(dict_of_df)}
-
+ "dim_counterparty": create_dim_counterparty(dict_of_df),
+ "dim_date": create_dim_date(dict_of_df),
+ "dim_location": create_dim_location(dict_of_df),
+ "dim_staff": create_dim_staff(dict_of_df),
+ "dim_design": create_dim_design(dict_of_df),
+ }
mutable_df_dict = {
- 'fact_sales_order': create_fact_sales_order(dict_of_df),
- 'fact_purchase_order': create_fact_purchase_orders(dict_of_df),
- 'fact_payment': create_fact_payment(dict_of_df),
- 'dim_currency': create_dim_currency(dict_of_df)}
-
+ "fact_sales_order": create_fact_sales_order(dict_of_df),
+ "fact_purchase_order": create_fact_purchase_orders(dict_of_df),
+ "fact_payment": create_fact_payment(dict_of_df),
+ "dim_currency": create_dim_currency(dict_of_df),
+ }
+
status = process_to_parquet_and_upload_to_s3(
- existing_s3_files,
- immutable_df_dict,
- mutable_df_dict,
- bucket
+ existing_s3_files, immutable_df_dict, mutable_df_dict, bucket
)
-
- if not status['uploaded']:
+
+ if not status["uploaded"]:
logger.info("No dataframes written to the bucket.")
return {
- 'statusCode': 204,
- "body": json.dumps("No files where uploaded."),
+ "statusCode": 204,
+ "body": json.dumps("No files where uploaded."),
}
-
+
return {
"statusCode": 200,
"body": json.dumps(
@@ -90,7 +94,7 @@ def lambda_handler(event, context):
'The following tables were not uploaded: '+', '.join([status['not_uploaded']]) if status['not_uploaded'] else ''}"""
),
}
-
+
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
return {"statusCode": 500, "body": json.dumps("Internal server error.")}
@@ -99,34 +103,38 @@ def lambda_handler(event, context):
db.close()
-def process_to_parquet_and_upload_to_s3(existing_s3_files,
- immutable_df_dict,
- mutable_df_dict,
- bucket,
- client=boto3.client('s3')):
- status = {'uploaded': [],
- 'not_uploaded': []}
+def process_to_parquet_and_upload_to_s3(
+ existing_s3_files,
+ immutable_df_dict,
+ mutable_df_dict,
+ bucket,
+ client=boto3.client("s3"),
+):
+ status = {"uploaded": [], "not_uploaded": []}
for table_name, df in immutable_df_dict.items():
if table_name in existing_s3_files:
- status['not_uploaded'].append(table_name)
+ status["not_uploaded"].append(table_name)
else:
- parquet_file = df.to_parquet(f'{table_name}.parquet', engine='pyarrow') #or fastparquet
- client.upload_file(parquet_file, bucket, f'{table_name}.parquet')
- status['uploaded'].append(table_name)
+ parquet_file = df.to_parquet(
+ f"{table_name}.parquet", engine="pyarrow"
+ ) # or fastparquet
+ client.upload_file(parquet_file, bucket, f"{table_name}.parquet")
+ status["uploaded"].append(table_name)
for table_name, df in mutable_df_dict.items():
s3_key = datetime.strftime(
- datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet")
- parquet_file = df.to_parquet(f'{table_name}.parquet', engine='pyarrow') #or fastparquet
+ datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
+ )
+ parquet_file = df.to_parquet(
+ f"{table_name}.parquet", engine="pyarrow"
+ ) # or fastparquet
client.upload_file(parquet_file, bucket, s3_key)
- status['uploaded'].append(table_name)
-
+ status["uploaded"].append(table_name)
return status
-
def retrieve_secrets():
secret_name = "bentley-secrets"
region_name = "eu-west-2"
@@ -175,19 +183,23 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")):
table_dfs[table] = pd.concat(list_of_df)
return table_dfs
+
def bucket_name(bucket_prefix, client=boto3.client("s3")):
response = client.list_buckets()
bucket_filter = [
- bucket["Name"] for bucket in response["Buckets"] if bucket_prefix in bucket["Name"]
+ bucket["Name"]
+ for bucket in response["Buckets"]
+ if bucket_prefix in bucket["Name"]
]
return bucket_filter[0]
+
def list_existing_s3_files(bucket_name, client=boto3.client("s3")):
logging.info("Listing existing S3 files")
try:
- response = client.list_objects_v2(Bucket=bucket_name)
+ response = client.list_objects_v2(Bucket=bucket_name)
if "Contents" in response:
existing_files = [obj["Key"] for obj in response["Contents"]]
@@ -198,4 +210,4 @@ def list_existing_s3_files(bucket_name, client=boto3.client("s3")):
except ClientError as e:
logger.error(f"Error listing S3 objects: {e}")
- return existing_files \ No newline at end of file
+ return existing_files
git.ajschof.me — hosted by ajschofield — powered by cgit