aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorEllie <ecsymonds@gmail.com>2024-08-23 12:08:09 +0100
committerEllie <ecsymonds@gmail.com>2024-08-23 12:08:09 +0100
commit65289cdd17359c6a29560339e134e0ddf9461ce0 (patch)
tree5edec225b3f531babca3b9b7a10eaa00899e8f27 /src
parent535e3cd919613d4cadfbb42ea8f2ecdd7678f38c (diff)
downloadde-project-bentley-65289cdd17359c6a29560339e134e0ddf9461ce0.tar.gz
de-project-bentley-65289cdd17359c6a29560339e134e0ddf9461ce0.zip
add amendments to load lambda
Diffstat (limited to 'src')
-rw-r--r--src/load_lambda.py66
1 files changed, 37 insertions, 29 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py
index d95c27a..f92bb45 100644
--- a/src/load_lambda.py
+++ b/src/load_lambda.py
@@ -1,11 +1,11 @@
import boto3
-from botocore.exceptions import ClientError
+from botocore.exceptions import ClientError, InterfaceError
import pandas as pd
import pyarrow.parquet as pq
from io import BytesIO
import logging
import json
-from src.extract_lambda import retrieve_secrets, connect_to_database
+from src.extract_lambda import retrieve_secrets
from sqlalchemy import create_engine
@@ -18,67 +18,74 @@ logging.basicConfig(
level=logging.DEBUG,
)
-logging.getLogger("botocore").setLevel(logging.WARNING)
+logging.getLogger("botocore").setLevel(logging.INFO)
+
def lambda_handler(event, context):
- db = None
try:
uploaded_tables = upload_dfs_to_database()
- if uploaded_tables == []:
+ if not uploaded_tables:
return {
"statusCode": 200,
- "body": json.dumps("No datframes were uploaded."),
+ "body": json.dumps("No dataframes were uploaded."),
}
return {
"statusCode": 200,
"body": json.dumps(
f"""The following dataframes were uploaded successfully:
- {', '.join(upload_dfs_to_database['updated'])}."""
+ {', '.join(uploaded_tables)} ."""
),
}
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
return {"statusCode": 500, "body": json.dumps("Internal server error.")}
- finally:
- if db:
- db.close()
# connect to database, slightly different way of doing it, to allow manipulation through pandas
def connect_to_db_and_return_engine():
- secrets = json.loads(retrieve_secrets("bentley-RDS-credentials")) #need to amend retrieve secrets function
- host = secrets["host"]
- port = secrets["port"]
- user = secrets["user"]
- password = secrets["password"]
- database = secrets["database"]
- conn_str = f'postgresql+pg8000://{user}:{password}@{host}:{port}/{database}'
- engine = create_engine(conn_str) #interface between python (pandas) and SQL
- return engine
-
-
+ try:
+ secrets = json.loads(retrieve_secrets("bentley-RDS-credentials")) #need to amend retrieve secrets function
+ host = secrets["host"]
+ port = secrets["port"]
+ user = secrets["user"]
+ password = secrets["password"]
+ database = secrets["database"]
+ conn_str = f'postgresql+pg8000://{user}:{password}@{host}:{port}/{database}'
+ engine = create_engine(conn_str) #interface between python (pandas) and SQL
+ return engine
+ except Exception as e:
+ logger.error(f"Interface error: {e}")
+ raise RuntimeError("Failed to create database engine")
+
# get transform bucket
-def transform_bucket(client=None):
+def get_transform_bucket(client=None):
if client is None:
client = boto3.client("s3")
- response = client.list_buckets()
+ try:
+ response = client.list_buckets()
+ except ClientError as e:
+ logger.error(f"Error listing S3 buckets: {e}")
+ raise RuntimeError("Error listing S3 buckets")
+
transform_bucket_filter = [
bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"]
]
if not transform_bucket_filter:
- raise ValueError("No transform_bucket found")
+ logger.error("No transform bucket found")
+ raise ValueError("No transform bucket found")
return transform_bucket_filter[0]
# list and then retrieve parquet files from S3 bucket
-# convert parquet files into dataframes and return a list of dataframes
+# convert parquet files into dataframes
+# return a dictionary of dataframes with name as key, and dataframe object as value
def convert_parquet_files_to_dfs(bucket_name=None, client=None):
try:
if client is None:
client = boto3.client("s3")
if bucket_name is None:
- bucket_name = transform_bucket(client)
+ bucket_name = get_transform_bucket()
files = client.list_objects_v2(Bucket=bucket_name)
dfs = {}
@@ -96,7 +103,7 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None):
logger.error(f"Unable to process file {file_key}: {e}")
else:
logger.error(f"No files found in {bucket_name}.")
- return []
+ return {}
except ValueError as value_error:
logger.error(f"Unable to list objects: {value_error}")
raise
@@ -111,11 +118,12 @@ def upload_dfs_to_database():
dict_of_dfs = convert_parquet_files_to_dfs()
db_engine = connect_to_db_and_return_engine()
try:
- for table_name, df in dict_of_dfs:
- df.to_sql(table_name, con=db_engine, ifexists="replace", index=False)
+ for table_name, df in dict_of_dfs.items():
+ df.to_sql(table_name, con=db_engine, if_exists="replace", index=False)
uploaded.append(table_name)
except Exception as e:
logger.error(f"Error uploading dataframes: {e}")
+ raise
db_engine.dispose()
return uploaded
git.ajschof.me — hosted by ajschofield — powered by cgit