diff options
| author | bulve-ad <78788030+bulve-ad@users.noreply.github.com> | 2024-08-21 15:51:03 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-08-21 15:51:03 +0100 |
| commit | ce76bbb2b32b58a93d88db4abdb1bbfbf27243ea (patch) | |
| tree | b8e77c62b6a2d50ab04215beb54055d14210a423 /src | |
| parent | c8e94530b65d6807b2b9bb246a542963839cce9d (diff) | |
| parent | d01d3bed939d7a17ea2205af502baeeb35510b5c (diff) | |
| download | de-project-bentley-ce76bbb2b32b58a93d88db4abdb1bbfbf27243ea.tar.gz de-project-bentley-ce76bbb2b32b58a93d88db4abdb1bbfbf27243ea.zip | |
Merge branch 'development' into feature/transform_lambda
Diffstat (limited to 'src')
| -rw-r--r-- | src/extract_lambda.py | 131 | ||||
| -rw-r--r-- | src/load_lambda.py | 2 | ||||
| -rw-r--r-- | src/secrets_manager.py | 48 | ||||
| -rw-r--r-- | src/transform_lambda.py | 1 |
4 files changed, 86 insertions, 96 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 4168e27..24f0981 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -1,17 +1,24 @@ -from pg8000.native import Connection, InterfaceError, identifier -import boto3 import csv -from botocore.exceptions import ClientError -import logging import json -from datetime import datetime +import logging import re +from datetime import datetime +from io import StringIO + +import boto3 +from botocore.exceptions import ClientError +from pg8000.native import Connection, InterfaceError, identifier +logger = logging.getLogger(__name__) -logger = logging.getLogger() -logger.setLevel(logging.INFO) +logging.basicConfig( + format="{asctime} - {levelname} - {message}", + style="{", + datefmt="%Y-%m-%d %H:%M", + level=logging.DEBUG, +) -# DB Exception class +logging.getLogger("botocore").setLevel(logging.WARNING) class DBConnectionException(Exception): @@ -28,6 +35,7 @@ def lambda_handler(event, context): and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them it uses 3 helper functions to achieve these 3 functionalities """ + db = None try: db = connect_to_database() existing_files = list_existing_s3_files() @@ -39,38 +47,44 @@ def lambda_handler(event, context): "statusCode": 200, "body": json.dumps("No changes detected, no CSV files were uploaded."), } - else: - return { - "statusCode": 200, - "body": json.dumps( - f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{ - 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}""" - ), - } + return { + "statusCode": 200, + "body": json.dumps( + f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{ + 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}""" + ), + } except Exception as e: - logger.error(f"Error: {e}") + logger.error(f"Error: {e}", exc_info=True) return {"statusCode": 500, "body": json.dumps("Internal server error.")} finally: if db: db.close() -def retrieve_secrets( - sm_client=boto3.client("secretsmanager"), secret_name="bentley-secrets" -): +def retrieve_secrets(): + secret_name = "bentley-secrets" + region_name = "eu-west-2" + + # Create a Secrets Manager client + session = boto3.session.Session() + client = session.client(service_name="secretsmanager", region_name=region_name) + try: - response = sm_client.get_secret_value(SecretId=secret_name) - if "SecretString" in response: - secret = json.loads(response["SecretString"]) - return secret + get_secret_value_response = client.get_secret_value(SecretId=secret_name) except ClientError as e: - logger.error(f"Could not retrieve secrets: {e}") + logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}") raise e + except KeyError: + logger.error(f"Secret {secret_name} does not contain a SecretString") + raise ValueError(f"Secret {secret_name} does not contain a SecretString") + + return get_secret_value_response["SecretString"] def connect_to_database() -> Connection: try: - secrets = retrieve_secrets() + secrets = json.loads(retrieve_secrets()) host = secrets["host"] port = secrets["port"] user = secrets["user"] @@ -90,6 +104,7 @@ def extract_bucket(client=boto3.client("s3")): extract_bucket_filter = [ bucket["Name"] for bucket in response["Buckets"] if "extract" in bucket["Name"] ] + return extract_bucket_filter[0] @@ -98,7 +113,7 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 results of listing the contents of the s3 bucket, then returns the populated dictionary """ - + logging.info("Listing existing S3 files") existing_files = {} try: @@ -115,6 +130,7 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 logger.error(f"Error retrieving S3 object {s3_key}: {e}") else: logger.error("The bucket is empty") + return None except ClientError as e: logger.error(f"Error listing S3 objects: {e}") @@ -122,6 +138,21 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 return existing_files +def get_latest_timestamp(existing_files): + if existing_files: + all_datetimes = [] + for file_name in existing_files.keys(): + match = re.search(r"\/(.+/).+_(.+)\.csv", file_name) + if match: + datetime_str = "".join(match.group(1, 2)) + all_datetimes.append( + datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S") + ) + return max(all_datetimes) if all_datetimes else datetime.min + + return existing_files + + def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """Creates a list of the tables from a database query and then selects everything from each table in individual queries @@ -130,29 +161,35 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): to files, or new tables/files it uploads them to the s3 bucket """ load_status = {"updated": [], "no change": []} - # Retrieving the latest file timestamp from S3 extract bucket - all_datetimes = [] - for file_names in existing_files.keys(): - datetime_str_on_s3 = "".join( - re.search(r"\/(.+/).+_(.+)\.csv", file_names).group(1, 2) - ) - all_datetimes.append(datetime.strptime(datetime_str_on_s3, "%Y/%m/%d/%H:%M:%S")) - latest_timestamp = max(all_datetimes) + latest_timestamp = get_latest_timestamp(existing_files) - # Iterating through tables on the database and retrieving only latest changes vs previous file load tables = db.run( """ - SELECT table_name - FROM information_schema.tables - WHERE table_schema='public' AND table_type='BASE TABLE';""" + SELECT table_name + FROM information_schema.tables + WHERE table_schema='public' AND table_name != '_prisma_migrations' + AND table_type='BASE TABLE'; + """ ) + for table in tables: table_name = table[0] - rows = db.run( - f"SELECT * FROM {identifier(table_name)} " "WHERE last_updated >= :latest;", - latest={datetime.strftime(latest_timestamp, "%H-%m-%d %H:%M:%S")}, + base_query = f""" + SELECT * FROM {identifier(table_name)} + WHERE last_updated >= :latest; + """ + latest = ( + { + datetime.strftime( + latest_timestamp if latest_timestamp else datetime(1990, 1, 1), + "%Y-%m-%d %H:%M:%S", + ) + }, ) - + logger.info(f"Processing table: {table_name}") + logger.info(f"Latest timestamp: {latest[0]}") + rows = db.run(base_query, latest=latest) + logger.debug(f"Rows: {rows}") # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" @@ -182,7 +219,9 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): logger.error(f"Error uploading to S3: {e}") else: load_status["no change"].append(table_name) - logger.info( - f"No new data in {table_name} name. Latest data retrieved is from {latest_timestamp}." - ) + logger.info(f"No new data") return load_status + + +if __name__ == "__main__": + lambda_handler(None, None) diff --git a/src/load_lambda.py b/src/load_lambda.py index 6ee681f..c6a8e60 100644 --- a/src/load_lambda.py +++ b/src/load_lambda.py @@ -1,2 +1,2 @@ def lambda_handler(): - pass
\ No newline at end of file + pass diff --git a/src/secrets_manager.py b/src/secrets_manager.py deleted file mode 100644 index c0fb61e..0000000 --- a/src/secrets_manager.py +++ /dev/null @@ -1,48 +0,0 @@ -import boto3 -from botocore.exceptions import ClientError -import json - - -def sm_client(): - sm_client = boto3.client('secretsmanager') - yield sm_client - -def create_secret(sm_client, secret_name, cohort_id, user, password, host, database, port): - secret = { - "cohort_id": cohort_id, - "user": user, - "password": password, - "host": host, - "database": database, - "port": port - } - - response = sm_client.create_secret( - Name = secret_name, - SecretString = json.dumps(secret) - ) - - print(response) - return response - -def list_secret(sm_client): - response = sm_client.list_secrets() - secret_dict = response['SecretList'] - secret_names = [] - for items in secret_dict: - secret_names.append(items['Name']) - print(f'{len(secret_names)} secret(s) available') - for name in secret_names: - print(name) - return secret_names - -def retrieve_secrets(sm_client): - response = sm_client.get_secrets( - - ) - - - -#retrieve secret -#so lambda can access totesy db -#so lambda connect to the db and then retrieve the data
\ No newline at end of file diff --git a/src/transform_lambda.py b/src/transform_lambda.py index b176ccc..9238180 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -1,4 +1,3 @@ -# from src.extract_lambda import extract_bucket import json import boto3 import re |
