aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorT-Aji <tolujbd2@gmail.com>2024-08-13 15:32:33 +0100
committerT-Aji <tolujbd2@gmail.com>2024-08-13 15:32:33 +0100
commitcdb4577b5ad7ae1f708797de6bbf17e289bfac14 (patch)
tree34b6a02e97036a675a2849c1f970b7f52d1fc1e2
parent974a8018f79d8592cbd6a59b1b26a9d288975328 (diff)
downloadde-project-bentley-cdb4577b5ad7ae1f708797de6bbf17e289bfac14.tar.gz
de-project-bentley-cdb4577b5ad7ae1f708797de6bbf17e289bfac14.zip
feat/ add logging & split task into 3 helper functions
-rw-r--r--src/extract_lambda.py140
1 files changed, 109 insertions, 31 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
index 8317ef8..11ea5d1 100644
--- a/src/extract_lambda.py
+++ b/src/extract_lambda.py
@@ -1,54 +1,132 @@
-from pg8000.native import Connection, Error, DatabaseError, InterfaceError
+from pg8000.native import Connection, DatabaseError, InterfaceError
from dotenv import load_dotenv
import os
import boto3
import csv
from botocore.exceptions import ClientError
+import logging
+import json
+logger = logging.getLogger()
+logger.setLevel(logging.INFO)
load_dotenv()
-def lambda_handler(event, context):
- client = boto3.client('s3')
-# temporary credentials for dev- will not have access when uploaded
+database = os.getenv('database')
+user = os.getenv('user')
+password = os.getenv('password')
+host = os.getenv('host')
+port = os.getenv('port')
+
+def lambda_handler(event, context):
+ """This lambda function connects to the Totesys database, lists the contents of the ingestion bucket,
+ and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them
+ it uses 3 helper functions to achieve these 3 functionalities
+ """
+ try:
+ db = connect_to_database()
+ existing_files = list_existing_s3_files()
+ any_changes = process_and_upload_tables(db, existing_files)
+
+ if not any_changes:
+ logger.info("No changes detected in the database.")
+ return {
+ 'statusCode': 200,
+ 'body': json.dumps('No changes detected, no CSV files were uploaded.')
+ }
+ else:
+ return {
+ 'statusCode': 200,
+ 'body': json.dumps('CSV files processed and uploaded successfully.')
+ }
+
+ except Exception as e:
+ logger.error(f'Error: {e}')
+ return {
+ 'statusCode': 500,
+ 'body': json.dumps('Internal server error.')
+ }
- database = os.getenv('database')
- user = os.getenv('user')
- password = os.getenv('password')
- host = os.getenv('host')
- port = os.getenv('port')
+ finally:
+
+ if db:
+ db.close()
+def connect_to_database():
try:
- db = Connection(
- database=database,
- user=user,
- password=password,
- host=host,
- port=port
+ return Connection(
+ database=database,
+ user=user,
+ password=password,
+ host=host,
+ port=port
)
except DatabaseError as e:
- print(e)
+ logger.error(f'Database error: {e}')
+ raise
except InterfaceError as i:
- print(i)
- #replace prints with upload to cloudwatch logs
+ logger.error(f'Interface error: {i}')
+ raise
+
+
+def list_existing_s3_files():
+ """Creates a dictionary and populates it with the
+ results of listing the contents of the s3 bucket, then
+ returns the populated dictionary
+ """
+ client = boto3.client('s3')
+ existing_files = {}
+
+ try:
+ response = client.list_objects_v2(Bucket=ingestion_bucket)
+
+ if 'Contents' in response:
+ for obj in response['Contents']:
+ s3_key = obj['Key']
+ try:
+ file_obj = client.get_object(Bucket=ingestion_bucket, Key=s3_key)
+ file_content = file_obj['Body'].read().decode('utf-8')
+ existing_files[s3_key] = file_content
+ except ClientError as e:
+ logger.error(f'Error retrieving S3 object {s3_key}: {e}')
+
+ except ClientError as e:
+ logger.error(f'Error listing S3 objects: {e}')
+
+ return existing_files
+
+
+
+def process_and_upload_tables(db, existing_files):
+ """Creates a list of the tables from a database query and
+ then selects everything from each table in individual queries
+ it then writes each table to CSV files and compares with the item
+ in the existing_files dictionary with the same name. If it finds sny changes
+ to files, or new tables/files it uploads them to the s3 bucket
+ """
+ client = boto3.client('s3')
tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';")
+
for table in tables:
table_name = table[0]
rows = db.run(f"SELECT * FROM {table_name};")
- # this saves the csv files to the repo root before writing to s3, this is unnecessary. how will the lambda behave when it attempts to save files?
- with open(f"{table_name}.csv", "w", newline='') as file:
+
+
+ csv_file_path = f"/tmp/{table_name}.csv"
+ with open(csv_file_path, "w", newline='') as file:
writer = csv.writer(file)
- writer.writerow([desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")])
+ column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
+ writer.writerow(column_names)
writer.writerows(rows)
- try:
- client.upload_file(file, Bucket='ingestion-bucket', Object_name=table_name)
-
- except ClientError as e:
- print(e)
- #replace print with upload to cloudwatch logs
-
- if db:
- db.close()
+
+ s3_key = f"{table_name}/latest.csv"
+ new_csv_content = open(csv_file_path, "r").read()
+
- \ No newline at end of file
+ if s3_key not in existing_files or existing_files[s3_key] != new_csv_content:
+ try:
+ client.upload_file(csv_file_path, ingestion_bucket, s3_key)
+ logger.info(f"Uploaded {s3_key} to S3.")
+ except ClientError as e:
+ logger.error(f'Error uploading to S3: {e}') \ No newline at end of file
git.ajschof.me — hosted by ajschofield — powered by cgit