diff options
Diffstat (limited to 'src/transform_lambda.py')
| -rw-r--r-- | src/transform_lambda.py | 198 |
1 files changed, 157 insertions, 41 deletions
diff --git a/src/transform_lambda.py b/src/transform_lambda.py index 6024a24..d30d91d 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -1,13 +1,35 @@ import json import boto3 import re +import logging import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -from src.extract_lambda import extract_bucket -from src.fact_purchase_table import * -from src.fact_sales_order import create_dim_staff, create_dim_design, create_fact_sales_order +from src.dataframes import * +# from src.extract_lambda import extract_bucket, DBConnectionException +import boto3 +from botocore.exceptions import ClientError +from pg8000.native import Connection, InterfaceError +from datetime import datetime + +class DBConnectionException(Exception): + """Wraps pg8000.native Error or DatabaseError.""" + + def __init__(self, e): + """Initialise with provided error message.""" + self.message = str(e) + super().__init__(self.message) + +logger = logging.getLogger(__name__) +logging.basicConfig( + format="{asctime} - {levelname} - {message}", + style="{", + datefmt="%Y-%m-%d %H:%M", + level=logging.DEBUG, +) + +logging.getLogger("botocore").setLevel(logging.WARNING) tables = [ "sales_order", @@ -24,47 +46,124 @@ tables = [ ] def lambda_handler(event, context): - dict_of_df = read_from_s3_subfolder_to_df(tables, extract_bucket(), client=boto3.client("s3")) - common_df_list = [create_dim_counterparty(dict_of_df), - create_dim_date(dict_of_df), - create_dim_location(dict_of_df), - create_dim_currency(dict_of_df), - create_dim_staff(dict_of_df)] + db = None - create_fact_purchase_order() + try: + db = connect_to_database() + bucket = bucket_name('transform') + existing_s3_files = list_existing_s3_files(bucket) - f_sales_list = [create_fact_sales_order(), - create_dim_design()] - - - ''' - #dict{ - sales_schema: { - Table_name: df_value, - ...} - payment_schema: - Table_name: df_value, - ...} - purchase_schema: - Table_name: df_value, - ...} - } - - for schema in dict: - for table_name, df_value in schema.items(): - parquet_file = df_value.to_parquet(f'{table_name}.parquet', engine='pyarrow'/'fastparquet'(?)) #we don't know the engine - - s3_key = datetime.strftime( - datetime.today(), f"{schema}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet" - ) - - client.upload_file( - parquet_file, transform_bucket(), s3_key) - ##might need seperate function for easier testing## - ''' + dict_of_df = read_from_s3_subfolder_to_df(tables, extract_bucket(), client=boto3.client("s3")) + + immutable_df_dict = { + 'dim_counterparty': create_dim_counterparty(dict_of_df), + 'dim_date': create_dim_date(dict_of_df), + 'dim_location': create_dim_location(dict_of_df), + 'dim_staff': create_dim_staff(dict_of_df), + 'dim_design': create_dim_design(dict_of_df)} + + + mutable_df_dict = { + 'fact_sales_order': create_fact_sales_order(dict_of_df), + 'fact_purchase_order': create_fact_purchase_orders(dict_of_df), + 'fact_payment': create_fact_payment(dict_of_df), + 'dim_currency': create_dim_currency(dict_of_df)} + + status = process_to_parquet_and_upload_to_s3( + existing_s3_files, + immutable_df_dict, + mutable_df_dict, + bucket + ) + + if not status['uploaded']: + logger.info("No dataframes written to the bucket.") + return { + 'statusCode': 204, + "body": json.dumps("No files where uploaded."), + } + + return { + "statusCode": 200, + "body": json.dumps( + f"""Parquet files processed for {', '.join(status['uploaded'])} and uploaded successfully.{ + 'The following tables were not uploaded: '+', '.join([status['not_uploaded']]) if status['not_uploaded'] else ''}""" + ), + } + + except Exception as e: + logger.error(f"Error: {e}", exc_info=True) + return {"statusCode": 500, "body": json.dumps("Internal server error.")} + finally: + if db: + db.close() + + +def process_to_parquet_and_upload_to_s3(existing_s3_files, + immutable_df_dict, + mutable_df_dict, + bucket, + client=boto3.client('s3')): + status = {'uploaded': [], + 'not_uploaded': []} + + for table_name, df in immutable_df_dict.items(): + if table_name in existing_s3_files: + status['not_uploaded'].append(table_name) + else: + parquet_file = df.to_parquet(f'{table_name}.parquet', engine='pyarrow') #or fastparquet + client.upload_file(parquet_file, bucket, f'{table_name}.parquet') + status['uploaded'].append(table_name) + + for table_name, df in mutable_df_dict.items(): + s3_key = datetime.strftime( + datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet") + parquet_file = df.to_parquet(f'{table_name}.parquet', engine='pyarrow') #or fastparquet + client.upload_file(parquet_file, bucket, s3_key) + status['uploaded'].append(table_name) + + + return status +def retrieve_secrets(): + secret_name = "bentley-secrets" + region_name = "eu-west-2" + + # Create a Secrets Manager client + session = boto3.session.Session() + client = session.client(service_name="secretsmanager", region_name=region_name) + + try: + get_secret_value_response = client.get_secret_value(SecretId=secret_name) + except ClientError as e: + logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}") + raise e + except KeyError: + logger.error(f"Secret {secret_name} does not contain a SecretString") + raise ValueError(f"Secret {secret_name} does not contain a SecretString") + + return get_secret_value_response["SecretString"] + + +def connect_to_database() -> Connection: + try: + secrets = json.loads(retrieve_secrets()) + host = secrets["host"] + port = secrets["port"] + user = secrets["user"] + password = secrets["password"] + database = secrets["database"] + + return Connection( + database=database, user=user, password=password, host=host, port=port + ) + except InterfaceError as i: + logger.error(f"Interface error: {i}") + raise DBConnectionException("Failed to connect to database") + + def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): table_dfs = {} for table in tables: @@ -76,10 +175,27 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): table_dfs[table] = pd.concat(list_of_df) return table_dfs -def transform_bucket(client=boto3.client("s3")): +def bucket_name(bucket_prefix, client=boto3.client("s3")): response = client.list_buckets() bucket_filter = [ - bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"] + bucket["Name"] for bucket in response["Buckets"] if bucket_prefix in bucket["Name"] ] return bucket_filter[0] + +def list_existing_s3_files(bucket_name, client=boto3.client("s3")): + logging.info("Listing existing S3 files") + + try: + response = client.list_objects_v2(Bucket=bucket_name) + + if "Contents" in response: + existing_files = [obj["Key"] for obj in response["Contents"]] + else: + logger.error("The bucket is empty") + return None + + except ClientError as e: + logger.error(f"Error listing S3 objects: {e}") + + return existing_files
\ No newline at end of file |
