aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorbulve-ad <78788030+bulve-ad@users.noreply.github.com>2024-08-21 15:51:03 +0100
committerGitHub <noreply@github.com>2024-08-21 15:51:03 +0100
commitce76bbb2b32b58a93d88db4abdb1bbfbf27243ea (patch)
treeb8e77c62b6a2d50ab04215beb54055d14210a423 /src
parentc8e94530b65d6807b2b9bb246a542963839cce9d (diff)
parentd01d3bed939d7a17ea2205af502baeeb35510b5c (diff)
downloadde-project-bentley-ce76bbb2b32b58a93d88db4abdb1bbfbf27243ea.tar.gz
de-project-bentley-ce76bbb2b32b58a93d88db4abdb1bbfbf27243ea.zip
Merge branch 'development' into feature/transform_lambda
Diffstat (limited to 'src')
-rw-r--r--src/extract_lambda.py131
-rw-r--r--src/load_lambda.py2
-rw-r--r--src/secrets_manager.py48
-rw-r--r--src/transform_lambda.py1
4 files changed, 86 insertions, 96 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
index 4168e27..24f0981 100644
--- a/src/extract_lambda.py
+++ b/src/extract_lambda.py
@@ -1,17 +1,24 @@
-from pg8000.native import Connection, InterfaceError, identifier
-import boto3
import csv
-from botocore.exceptions import ClientError
-import logging
import json
-from datetime import datetime
+import logging
import re
+from datetime import datetime
+from io import StringIO
+
+import boto3
+from botocore.exceptions import ClientError
+from pg8000.native import Connection, InterfaceError, identifier
+logger = logging.getLogger(__name__)
-logger = logging.getLogger()
-logger.setLevel(logging.INFO)
+logging.basicConfig(
+ format="{asctime} - {levelname} - {message}",
+ style="{",
+ datefmt="%Y-%m-%d %H:%M",
+ level=logging.DEBUG,
+)
-# DB Exception class
+logging.getLogger("botocore").setLevel(logging.WARNING)
class DBConnectionException(Exception):
@@ -28,6 +35,7 @@ def lambda_handler(event, context):
and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them
it uses 3 helper functions to achieve these 3 functionalities
"""
+ db = None
try:
db = connect_to_database()
existing_files = list_existing_s3_files()
@@ -39,38 +47,44 @@ def lambda_handler(event, context):
"statusCode": 200,
"body": json.dumps("No changes detected, no CSV files were uploaded."),
}
- else:
- return {
- "statusCode": 200,
- "body": json.dumps(
- f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{
- 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}"""
- ),
- }
+ return {
+ "statusCode": 200,
+ "body": json.dumps(
+ f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{
+ 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}"""
+ ),
+ }
except Exception as e:
- logger.error(f"Error: {e}")
+ logger.error(f"Error: {e}", exc_info=True)
return {"statusCode": 500, "body": json.dumps("Internal server error.")}
finally:
if db:
db.close()
-def retrieve_secrets(
- sm_client=boto3.client("secretsmanager"), secret_name="bentley-secrets"
-):
+def retrieve_secrets():
+ secret_name = "bentley-secrets"
+ region_name = "eu-west-2"
+
+ # Create a Secrets Manager client
+ session = boto3.session.Session()
+ client = session.client(service_name="secretsmanager", region_name=region_name)
+
try:
- response = sm_client.get_secret_value(SecretId=secret_name)
- if "SecretString" in response:
- secret = json.loads(response["SecretString"])
- return secret
+ get_secret_value_response = client.get_secret_value(SecretId=secret_name)
except ClientError as e:
- logger.error(f"Could not retrieve secrets: {e}")
+ logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}")
raise e
+ except KeyError:
+ logger.error(f"Secret {secret_name} does not contain a SecretString")
+ raise ValueError(f"Secret {secret_name} does not contain a SecretString")
+
+ return get_secret_value_response["SecretString"]
def connect_to_database() -> Connection:
try:
- secrets = retrieve_secrets()
+ secrets = json.loads(retrieve_secrets())
host = secrets["host"]
port = secrets["port"]
user = secrets["user"]
@@ -90,6 +104,7 @@ def extract_bucket(client=boto3.client("s3")):
extract_bucket_filter = [
bucket["Name"] for bucket in response["Buckets"] if "extract" in bucket["Name"]
]
+
return extract_bucket_filter[0]
@@ -98,7 +113,7 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3
results of listing the contents of the s3 bucket, then
returns the populated dictionary
"""
-
+ logging.info("Listing existing S3 files")
existing_files = {}
try:
@@ -115,6 +130,7 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3
logger.error(f"Error retrieving S3 object {s3_key}: {e}")
else:
logger.error("The bucket is empty")
+ return None
except ClientError as e:
logger.error(f"Error listing S3 objects: {e}")
@@ -122,6 +138,21 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3
return existing_files
+def get_latest_timestamp(existing_files):
+ if existing_files:
+ all_datetimes = []
+ for file_name in existing_files.keys():
+ match = re.search(r"\/(.+/).+_(.+)\.csv", file_name)
+ if match:
+ datetime_str = "".join(match.group(1, 2))
+ all_datetimes.append(
+ datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")
+ )
+ return max(all_datetimes) if all_datetimes else datetime.min
+
+ return existing_files
+
+
def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
"""Creates a list of the tables from a database query and
then selects everything from each table in individual queries
@@ -130,29 +161,35 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
to files, or new tables/files it uploads them to the s3 bucket
"""
load_status = {"updated": [], "no change": []}
- # Retrieving the latest file timestamp from S3 extract bucket
- all_datetimes = []
- for file_names in existing_files.keys():
- datetime_str_on_s3 = "".join(
- re.search(r"\/(.+/).+_(.+)\.csv", file_names).group(1, 2)
- )
- all_datetimes.append(datetime.strptime(datetime_str_on_s3, "%Y/%m/%d/%H:%M:%S"))
- latest_timestamp = max(all_datetimes)
+ latest_timestamp = get_latest_timestamp(existing_files)
- # Iterating through tables on the database and retrieving only latest changes vs previous file load
tables = db.run(
"""
- SELECT table_name
- FROM information_schema.tables
- WHERE table_schema='public' AND table_type='BASE TABLE';"""
+ SELECT table_name
+ FROM information_schema.tables
+ WHERE table_schema='public' AND table_name != '_prisma_migrations'
+ AND table_type='BASE TABLE';
+ """
)
+
for table in tables:
table_name = table[0]
- rows = db.run(
- f"SELECT * FROM {identifier(table_name)} " "WHERE last_updated >= :latest;",
- latest={datetime.strftime(latest_timestamp, "%H-%m-%d %H:%M:%S")},
+ base_query = f"""
+ SELECT * FROM {identifier(table_name)}
+ WHERE last_updated >= :latest;
+ """
+ latest = (
+ {
+ datetime.strftime(
+ latest_timestamp if latest_timestamp else datetime(1990, 1, 1),
+ "%Y-%m-%d %H:%M:%S",
+ )
+ },
)
-
+ logger.info(f"Processing table: {table_name}")
+ logger.info(f"Latest timestamp: {latest[0]}")
+ rows = db.run(base_query, latest=latest)
+ logger.debug(f"Rows: {rows}")
# Creating a temporary file path and writing the column name to it followed by each row of data
if rows:
csv_file_path = f"/tmp/{table_name}.csv"
@@ -182,7 +219,9 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
logger.error(f"Error uploading to S3: {e}")
else:
load_status["no change"].append(table_name)
- logger.info(
- f"No new data in {table_name} name. Latest data retrieved is from {latest_timestamp}."
- )
+ logger.info(f"No new data")
return load_status
+
+
+if __name__ == "__main__":
+ lambda_handler(None, None)
diff --git a/src/load_lambda.py b/src/load_lambda.py
index 6ee681f..c6a8e60 100644
--- a/src/load_lambda.py
+++ b/src/load_lambda.py
@@ -1,2 +1,2 @@
def lambda_handler():
- pass \ No newline at end of file
+ pass
diff --git a/src/secrets_manager.py b/src/secrets_manager.py
deleted file mode 100644
index c0fb61e..0000000
--- a/src/secrets_manager.py
+++ /dev/null
@@ -1,48 +0,0 @@
-import boto3
-from botocore.exceptions import ClientError
-import json
-
-
-def sm_client():
- sm_client = boto3.client('secretsmanager')
- yield sm_client
-
-def create_secret(sm_client, secret_name, cohort_id, user, password, host, database, port):
- secret = {
- "cohort_id": cohort_id,
- "user": user,
- "password": password,
- "host": host,
- "database": database,
- "port": port
- }
-
- response = sm_client.create_secret(
- Name = secret_name,
- SecretString = json.dumps(secret)
- )
-
- print(response)
- return response
-
-def list_secret(sm_client):
- response = sm_client.list_secrets()
- secret_dict = response['SecretList']
- secret_names = []
- for items in secret_dict:
- secret_names.append(items['Name'])
- print(f'{len(secret_names)} secret(s) available')
- for name in secret_names:
- print(name)
- return secret_names
-
-def retrieve_secrets(sm_client):
- response = sm_client.get_secrets(
-
- )
-
-
-
-#retrieve secret
-#so lambda can access totesy db
-#so lambda connect to the db and then retrieve the data \ No newline at end of file
diff --git a/src/transform_lambda.py b/src/transform_lambda.py
index b176ccc..9238180 100644
--- a/src/transform_lambda.py
+++ b/src/transform_lambda.py
@@ -1,4 +1,3 @@
-# from src.extract_lambda import extract_bucket
import json
import boto3
import re
git.ajschof.me — hosted by ajschofield — powered by cgit