aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAlex <git@ajschof.me>2024-08-19 12:09:25 +0100
committerGitHub <noreply@github.com>2024-08-19 12:09:25 +0100
commitf28e4038d20b4630fafcae9a7825794e529bace2 (patch)
tree0c378561e0dde843c0a281c692d137bb6bb0d0a7 /src
parent5cc511d2afeea262db0db7039c8f83c123da77ea (diff)
parent09b8b7903098a988a9a022d0ab607f8131c9c78f (diff)
downloadde-project-bentley-f28e4038d20b4630fafcae9a7825794e529bace2.tar.gz
de-project-bentley-f28e4038d20b4630fafcae9a7825794e529bace2.zip
Merge branch 'development' into feature/test-extract-lambda
Diffstat (limited to 'src')
-rw-r--r--src/extract_lambda.py196
-rw-r--r--src/load_lambda.py2
-rw-r--r--src/secrets_manager.py31
-rw-r--r--src/transform_lambda.py2
4 files changed, 132 insertions, 99 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
index fb2d7e8..4168e27 100644
--- a/src/extract_lambda.py
+++ b/src/extract_lambda.py
@@ -1,5 +1,4 @@
-from pg8000.native import Connection, DatabaseError, InterfaceError
-from dotenv import dotenv_values
+from pg8000.native import Connection, InterfaceError, identifier
import boto3
import csv
from botocore.exceptions import ClientError
@@ -12,6 +11,8 @@ import re
logger = logging.getLogger()
logger.setLevel(logging.INFO)
+# DB Exception class
+
class DBConnectionException(Exception):
"""Wraps pg8000.native Error or DatabaseError."""
@@ -21,136 +22,167 @@ class DBConnectionException(Exception):
self.message = str(e)
super().__init__(self.message)
+
def lambda_handler(event, context):
"""This lambda function connects to the Totesys database, lists the contents of the ingestion bucket,
- and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them
- it uses 3 helper functions to achieve these 3 functionalities
+ and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them
+ it uses 3 helper functions to achieve these 3 functionalities
"""
try:
db = connect_to_database()
existing_files = list_existing_s3_files()
any_changes = process_and_upload_tables(db, existing_files)
-
- if not any_changes:
+
+ if not any_changes["updated"]:
logger.info("No changes detected in the database.")
return {
- 'statusCode': 200,
- 'body': json.dumps('No changes detected, no CSV files were uploaded.')
+ "statusCode": 200,
+ "body": json.dumps("No changes detected, no CSV files were uploaded."),
}
else:
return {
- 'statusCode': 200,
- 'body': json.dumps('CSV files processed and uploaded successfully.')
+ "statusCode": 200,
+ "body": json.dumps(
+ f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{
+ 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}"""
+ ),
}
-
except Exception as e:
- logger.error(f'Error: {e}')
- return {
- 'statusCode': 500,
- 'body': json.dumps('Internal server error.')
- }
-
+ logger.error(f"Error: {e}")
+ return {"statusCode": 500, "body": json.dumps("Internal server error.")}
finally:
-
if db:
db.close()
-def get_config(path: str = ".env") -> dict:
- return dotenv_values(path)
+
+def retrieve_secrets(
+ sm_client=boto3.client("secretsmanager"), secret_name="bentley-secrets"
+):
+ try:
+ response = sm_client.get_secret_value(SecretId=secret_name)
+ if "SecretString" in response:
+ secret = json.loads(response["SecretString"])
+ return secret
+ except ClientError as e:
+ logger.error(f"Could not retrieve secrets: {e}")
+ raise e
def connect_to_database() -> Connection:
try:
- config = get_config()
- host = config["host"]
- port = config["port"]
- user = config["user"]
- password = config["password"]
- database = config["database"]
+ secrets = retrieve_secrets()
+ host = secrets["host"]
+ port = secrets["port"]
+ user = secrets["user"]
+ password = secrets["password"]
+ database = secrets["database"]
return Connection(
- database=database,
- user=user,
- password=password,
- host=host,
- port=port
+ database=database, user=user, password=password, host=host, port=port
)
except InterfaceError as i:
- logger.error(f'Interface error: {i}')
+ logger.error(f"Interface error: {i}")
raise DBConnectionException("Failed to connect to database")
+def extract_bucket(client=boto3.client("s3")):
+ response = client.list_buckets()
+ extract_bucket_filter = [
+ bucket["Name"] for bucket in response["Buckets"] if "extract" in bucket["Name"]
+ ]
+ return extract_bucket_filter[0]
-def list_existing_s3_files(bucket_name='extract_bucket', client=boto3.client('s3')):
- """Creates a dictionary and populates it with the
- results of listing the contents of the s3 bucket, then
- returns the populated dictionary
+
+def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3")):
+ """Creates a dictionary and populates it with the
+ results of listing the contents of the s3 bucket, then
+ returns the populated dictionary
"""
-
+
existing_files = {}
-
+
try:
- response = client.list_objects_v2(Bucket='extract_bucket')
-
- if 'Contents' in response:
- for obj in response['Contents']:
- s3_key = obj['Key']
+ response = client.list_objects_v2(Bucket=bucket_name)
+
+ if "Contents" in response:
+ for obj in response["Contents"]:
+ s3_key = obj["Key"]
try:
file_obj = client.get_object(Bucket=bucket_name, Key=s3_key)
- file_content = file_obj['Body'].read().decode('utf-8')
+ file_content = file_obj["Body"].read().decode("utf-8")
existing_files[s3_key] = file_content
except ClientError as e:
- logger.error(f'Error retrieving S3 object {s3_key}: {e}')
+ logger.error(f"Error retrieving S3 object {s3_key}: {e}")
else:
- logger.error('The bucket is empty')
-
+ logger.error("The bucket is empty")
+
except ClientError as e:
- logger.error(f'Error listing S3 objects: {e}')
-
- return existing_files
+ logger.error(f"Error listing S3 objects: {e}")
+ return existing_files
-def process_and_upload_tables(db, existing_files, client=boto3.client('s3')):
- """Creates a list of the tables from a database query and
- then selects everything from each table in individual queries
- it then writes each table to CSV files and compares with the item
- in the existing_files dictionary with the same name. If it finds any changes
- to files, or new tables/files it uploads them to the s3 bucket
+def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
+ """Creates a list of the tables from a database query and
+ then selects everything from each table in individual queries
+ it then writes each table to CSV files and compares with the item
+ in the existing_files dictionary with the same name. If it finds any changes
+ to files, or new tables/files it uploads them to the s3 bucket
"""
- ## NEW CODE
+ load_status = {"updated": [], "no change": []}
+ # Retrieving the latest file timestamp from S3 extract bucket
all_datetimes = []
for file_names in existing_files.keys():
- datetime_str_on_s3 = ''.join(re.search(r'\/(.+/).+_(.+)\.csv',file_names).group(1,2))
- all_datetimes.append(datetime.strptime(datetime_str_on_s3, '%Y/%m/%d/%H:%M:%S'))
+ datetime_str_on_s3 = "".join(
+ re.search(r"\/(.+/).+_(.+)\.csv", file_names).group(1, 2)
+ )
+ all_datetimes.append(datetime.strptime(datetime_str_on_s3, "%Y/%m/%d/%H:%M:%S"))
latest_timestamp = max(all_datetimes)
- ## END OF NEW CODE
- tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';")
- print(tables)
+ # Iterating through tables on the database and retrieving only latest changes vs previous file load
+ tables = db.run(
+ """
+ SELECT table_name
+ FROM information_schema.tables
+ WHERE table_schema='public' AND table_type='BASE TABLE';"""
+ )
for table in tables:
table_name = table[0]
- rows = db.run(f"SELECT * FROM {table_name};")
-
-
- csv_file_path = f"/tmp/{table_name}.csv"
- with open(csv_file_path, "w", newline='') as file:
- writer = csv.writer(file)
- #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
- column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")]
- writer.writerow(column_names)
- writer.writerows(rows)
- s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv')
- new_csv_content = open(csv_file_path, "r").read()
- ## NEW CODE
- latest_s3_object_key = datetime.strftime(latest_timestamp,f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv')
- ## END OF NEW CODE
- if existing_files[latest_s3_object_key] != new_csv_content:
+ rows = db.run(
+ f"SELECT * FROM {identifier(table_name)} " "WHERE last_updated >= :latest;",
+ latest={datetime.strftime(latest_timestamp, "%H-%m-%d %H:%M:%S")},
+ )
+
+ # Creating a temporary file path and writing the column name to it followed by each row of data
+ if rows:
+ csv_file_path = f"/tmp/{table_name}.csv"
+ with open(csv_file_path, "w", newline="") as file:
+ writer = csv.writer(file)
+ # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
+ column_names = [
+ col_name[0]
+ for col_name in db.run(
+ """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS
+ WHERE table_name = :table ;""",
+ table=table_name,
+ )
+ ]
+ writer.writerow(column_names)
+ writer.writerows(rows)
+ s3_key = datetime.strftime(
+ datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv"
+ )
+
+ # Writing the new file to S3 extract bucket:
try:
- client.upload_file(csv_file_path, 'extract_bucket', s3_key)
+ client.upload_file(csv_file_path, extract_bucket(), s3_key)
+ load_status["updated"].append(table_name)
logger.info(f"Uploaded {s3_key} to S3.")
except ClientError as e:
- logger.error(f'Error uploading to S3: {e}')
+ logger.error(f"Error uploading to S3: {e}")
else:
- logger.info(f"No new data.")
- \ No newline at end of file
+ load_status["no change"].append(table_name)
+ logger.info(
+ f"No new data in {table_name} name. Latest data retrieved is from {latest_timestamp}."
+ )
+ return load_status
diff --git a/src/load_lambda.py b/src/load_lambda.py
index 6ee681f..c6a8e60 100644
--- a/src/load_lambda.py
+++ b/src/load_lambda.py
@@ -1,2 +1,2 @@
def lambda_handler():
- pass \ No newline at end of file
+ pass
diff --git a/src/secrets_manager.py b/src/secrets_manager.py
index c0fb61e..3484688 100644
--- a/src/secrets_manager.py
+++ b/src/secrets_manager.py
@@ -4,45 +4,46 @@ import json
def sm_client():
- sm_client = boto3.client('secretsmanager')
+ sm_client = boto3.client("secretsmanager")
yield sm_client
-def create_secret(sm_client, secret_name, cohort_id, user, password, host, database, port):
+
+def create_secret(
+ sm_client, secret_name, cohort_id, user, password, host, database, port
+):
secret = {
"cohort_id": cohort_id,
"user": user,
"password": password,
"host": host,
"database": database,
- "port": port
+ "port": port,
}
response = sm_client.create_secret(
- Name = secret_name,
- SecretString = json.dumps(secret)
+ Name=secret_name, SecretString=json.dumps(secret)
)
print(response)
return response
+
def list_secret(sm_client):
response = sm_client.list_secrets()
- secret_dict = response['SecretList']
+ secret_dict = response["SecretList"]
secret_names = []
for items in secret_dict:
- secret_names.append(items['Name'])
- print(f'{len(secret_names)} secret(s) available')
+ secret_names.append(items["Name"])
+ print(f"{len(secret_names)} secret(s) available")
for name in secret_names:
print(name)
return secret_names
-def retrieve_secrets(sm_client):
- response = sm_client.get_secrets(
-
- )
+def retrieve_secrets(sm_client):
+ response = sm_client.get_secrets()
-#retrieve secret
-#so lambda can access totesy db
-#so lambda connect to the db and then retrieve the data \ No newline at end of file
+# retrieve secret
+# so lambda can access totesy db
+# so lambda connect to the db and then retrieve the data
diff --git a/src/transform_lambda.py b/src/transform_lambda.py
index 6ee681f..c6a8e60 100644
--- a/src/transform_lambda.py
+++ b/src/transform_lambda.py
@@ -1,2 +1,2 @@
def lambda_handler():
- pass \ No newline at end of file
+ pass
git.ajschof.me — hosted by ajschofield — powered by cgit