aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbulve-ad <78788030+bulve-ad@users.noreply.github.com>2024-08-27 17:03:37 +0100
committerGitHub <noreply@github.com>2024-08-27 17:03:37 +0100
commita6ab5152a2294021d852fcc8c04f2e8eff931a45 (patch)
treef128e330b514a1d72fc8eeed915679adf1954aed
parentdc095acd4d5b9f73a716a076ce601c3810f9635b (diff)
parent6761a7a8558079dc3107d69f2f3affc67e1577ae (diff)
downloadde-project-bentley-a6ab5152a2294021d852fcc8c04f2e8eff931a45.tar.gz
de-project-bentley-a6ab5152a2294021d852fcc8c04f2e8eff931a45.zip
Merge branch 'development' into test/transform-helper-functions
-rw-r--r--requirements.txt3
-rw-r--r--src/load_lambda.py199
-rw-r--r--tests/test_load_lambda.py92
3 files changed, 291 insertions, 3 deletions
diff --git a/requirements.txt b/requirements.txt
index 0c81216..763b95a 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -30,5 +30,6 @@ Werkzeug==3.0.3
xmltodict==0.13.0
s3fs
pandas
+pyarrow
+SQLAlchemy
bs4
-pyarrow \ No newline at end of file
diff --git a/src/load_lambda.py b/src/load_lambda.py
index c6a8e60..6e6bc80 100644
--- a/src/load_lambda.py
+++ b/src/load_lambda.py
@@ -1,2 +1,197 @@
-def lambda_handler():
- pass
+import boto3
+from botocore.exceptions import ClientError
+import pandas as pd
+import pyarrow.parquet as pq
+from io import BytesIO
+import logging
+import json
+from src.extract_lambda import retrieve_secrets
+from sqlalchemy import create_engine
+
+
+logger = logging.getLogger(__name__)
+
+logging.basicConfig(
+ format="{asctime} - {levelname} - {message}",
+ style="{",
+ datefmt="%Y-%m-%d %H:%M",
+ level=logging.DEBUG,
+)
+
+logging.getLogger("botocore").setLevel(logging.INFO)
+
+
+def lambda_handler(event, context):
+ try:
+ uploaded_tables = upload_dfs_to_database()
+ if not uploaded_tables["uploaded"]:
+ return {
+ "statusCode": 200,
+ "body": json.dumps("No dataframes were uploaded."),
+ }
+ return {
+ "statusCode": 200,
+ "body": json.dumps(
+ f"""The following dataframes were uploaded successfully:
+ {uploaded_tables["uploaded"]} ."""
+ ),
+ }
+ except Exception as e:
+ logger.error(f"Error: {e}", exc_info=True)
+ return {"statusCode": 500, "body": json.dumps("Internal server error.")}
+
+
+def retrieve_secrets():
+ secret_name = "bentley-RDS-credentials"
+ region_name = "eu-west-2"
+
+ # Create a Secrets Manager client
+ session = boto3.session.Session()
+ client = session.client(service_name="secretsmanager", region_name=region_name)
+
+ try:
+ get_secret_value_response = client.get_secret_value(SecretId=secret_name)
+ except ClientError as e:
+ logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}")
+ raise e
+ except KeyError:
+ logger.error(f"Secret {secret_name} does not contain a SecretString")
+ raise ValueError(f"Secret {secret_name} does not contain a SecretString")
+
+ return get_secret_value_response["SecretString"]
+
+
+# connect to database, slightly different way of doing it, to allow manipulation through pandas
+
+
+def connect_to_db_and_return_engine():
+ try:
+ secrets = json.loads(retrieve_secrets())
+ host = secrets["host"]
+ port = secrets["port"]
+ user = secrets["user"]
+ password = secrets["password"]
+ database = secrets["database"]
+ conn_str = f"postgresql+pg8000://{user}:{password}@{host}:{port}/{database}"
+ # interface between python (pandas) and SQL
+ engine = create_engine(conn_str)
+ return engine
+ except Exception as e:
+ logger.error(f"Interface error: {e}")
+ raise RuntimeError("Failed to create database engine")
+
+
+# get transform bucket
+def get_transform_bucket(client=None):
+ if client is None:
+ client = boto3.client("s3")
+ try:
+ response = client.list_buckets()
+ except ClientError as e:
+ logger.error(f"Error listing S3 buckets: {e}")
+ raise RuntimeError("Error listing S3 buckets")
+
+ transform_bucket_filter = [
+ bucket["Name"]
+ for bucket in response["Buckets"]
+ if "transform" in bucket["Name"]
+ ]
+
+ if not transform_bucket_filter:
+ logger.error("No transform bucket found")
+ raise ValueError("No transform bucket found")
+
+ return transform_bucket_filter[0]
+
+
+# list and then retrieve parquet files from S3 bucket
+# convert parquet files into dataframes
+# return a dictionary of dataframes with name as key, and dataframe object as value
+
+
+def convert_parquet_files_to_dfs(bucket_name=None, client=None):
+ try:
+ if client is None:
+ client = boto3.client("s3")
+ if bucket_name is None:
+ bucket_name = get_transform_bucket()
+ files = client.list_objects_v2(Bucket=bucket_name)
+
+ dfs = {}
+ if "Contents" in files:
+ for file in files["Contents"]:
+ file_key = file["Key"]
+ try:
+ file_obj = client.get_object(Bucket=bucket_name, Key=file_key)
+ parquet_file = pq.ParquetFile(BytesIO(file_obj["Body"].read()))
+ df = parquet_file.read().to_pandas()
+ dfs[file_key] = df
+ except ClientError as e:
+ logger.error(f"Unable to retrieve S3 object {file_key}: {e}")
+ except Exception as e:
+ logger.error(f"Unable to process file {file_key}: {e}")
+ else:
+ logger.error(f"No files found in {bucket_name}.")
+ return {}
+ except ValueError as value_error:
+ logger.error(f"Unable to list objects: {value_error}")
+ raise
+ except ClientError as client_error:
+ logger.error(f"Unable to list objects: {client_error}")
+ raise
+
+ return dfs
+
+
+def upload_dfs_to_database():
+ upload_status = {"uploaded": [], "not_uploaded": []}
+ dict_of_dfs = convert_parquet_files_to_dfs()
+ db_engine = connect_to_db_and_return_engine()
+ immutable_df_dict = [
+ "dim_counterparty.parquet",
+ "dim_date.parquet", # this needs to be mutable
+ "dim_location.parquet",
+ "dim_staff.parquet",
+ "dim_design.parquet",
+ ]
+ mutable_df_dict = [
+ "fact_sales_order",
+ "fact_purchase_order",
+ "fact_payment",
+ "dim_currency",
+ ]
+
+ for file_name, df in dict_of_dfs.items():
+ if file_name in immutable_df_dict:
+ table_name = file_name.split(".")[0]
+ try:
+ df.to_sql(
+ table_name,
+ con=db_engine,
+ schema="project_team_2",
+ if_exists="overwrite",
+ index=False,
+ )
+ upload_status["uploaded"].append(table_name)
+ except Exception as e:
+ logger.error(f"Error uploading dataframe {file_name} to database: {e}")
+ raise
+ elif file_name.rsplit("_", 1)[0] in mutable_df_dict:
+ table_name = file_name.rsplit("_", 1)[0]
+ try:
+ df.to_sql(
+ table_name,
+ con=db_engine,
+ schema="project_team_2",
+ if_exists="overwrite",
+ index=False,
+ )
+ upload_status["uploaded"].append(table_name)
+ except Exception as e:
+ logger.error(f"Error uploading dataframe {file_name} to database: {e}")
+ raise
+ else:
+ upload_status["not_uploaded"].append(file_name)
+ logger.error(f"{file_name} does not correspond with table in database")
+ db_engine.dispose()
+ return upload_status
diff --git a/tests/test_load_lambda.py b/tests/test_load_lambda.py
new file mode 100644
index 0000000..88c71e4
--- /dev/null
+++ b/tests/test_load_lambda.py
@@ -0,0 +1,92 @@
+import pandas as pd
+import pyarrow.parquet as pq
+from io import BytesIO
+from moto import mock_aws
+import boto3
+import os
+import pytest
+from src.load_lambda import (
+ lambda_handler,
+ connect_to_db_and_return_engine,
+ get_transform_bucket,
+ convert_parquet_files_to_dfs,
+ upload_dfs_to_database,
+)
+
+
+@pytest.fixture(scope="class")
+def aws_credentials():
+ os.environ["AWS_ACCESS_KEY_ID"] = "testing"
+ os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+ os.environ["AWS_SECURIT_TOKEN"] = "testing"
+ os.environ["AWS_SESSION_TOKEN"] = "testing"
+ os.environ["AWS_DEFAULT_REGION"] = "eu-west-2"
+
+
+@pytest.fixture(scope="class")
+def mock_s3_client(aws_credentials):
+ with mock_aws():
+ yield boto3.client("s3")
+
+
+class TestLambdaHandler:
+ pass
+
+
+class TestRetrieveSecrets:
+ pass
+
+
+class TestConnectToDBAndReturnEngine:
+ pass
+
+
+class TestGetTransformBucket:
+ def test_raises_value_error_if_no_buckets(self, mock_s3_client):
+ with pytest.raises(ValueError, match="No transform bucket found"):
+ get_transform_bucket(mock_s3_client)
+
+ def test_raises_value_error_if_no_transform_bucket(self, mock_s3_client):
+ mock_s3_client.create_bucket(
+ Bucket="extract_bucket",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
+ with pytest.raises(ValueError, match="No transform bucket found"):
+ get_transform_bucket(mock_s3_client)
+
+ def test_returns_transform_bucket_if_one_bucket(self, mock_s3_client):
+ mock_s3_client.create_bucket(
+ Bucket="transform_bucket",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
+ result = get_transform_bucket(mock_s3_client)
+ assert result == "transform_bucket"
+
+ def test_only_returns_transform_bucket_if_several_buckets(self, mock_s3_client):
+ mock_s3_client.create_bucket(
+ Bucket="another_test_bucket",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
+ result = get_transform_bucket(mock_s3_client)
+ assert result == "transform_bucket"
+
+
+class TestConvertParquetToDfs:
+ def test_function_returns_empty_dictionary_if_no_files(self, mock_s3_client):
+ mock_s3_client.create_bucket(
+ Bucket="transform_bucket",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
+ result = convert_parquet_files_to_dfs(
+ bucket_name="transform_bucket", client=mock_s3_client
+ )
+ assert result == {}
+
+ # def test_function_returns_dictionary_with_table_with_file_key():
+ # # need to mock parquet file and upload to mock bucket
+ # result = convert_parquet_files_to_dfs(bucket_name="transform_bucket", client=mock_s3_client)
+ # assert "dim_staff" in result
+
+
+class TestUploadDfsToDatabase:
+ pass
git.ajschof.me — hosted by ajschofield — powered by cgit