From b9f3576771c8af8933d23e95f7863f63e2bbc6aa Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Mon, 19 Aug 2024 15:43:28 +0100 Subject: wip: fixed broken tests; hashed out test_error_retrieving_object --- src/extract_lambda.py | 1 + 1 file changed, 1 insertion(+) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 4168e27..217efdb 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -90,6 +90,7 @@ def extract_bucket(client=boto3.client("s3")): extract_bucket_filter = [ bucket["Name"] for bucket in response["Buckets"] if "extract" in bucket["Name"] ] + return extract_bucket_filter[0] -- cgit v1.2.3 From c3c45c0d133ce32d48f1c72a0ac54f291038b1e7 Mon Sep 17 00:00:00 2001 From: Ellie Date: Mon, 19 Aug 2024 15:56:48 +0100 Subject: wip: fixing last test --- src/extract_lambda.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 4168e27..533bf82 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -147,12 +147,13 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): WHERE table_schema='public' AND table_type='BASE TABLE';""" ) for table in tables: + print(tables) table_name = table[0] rows = db.run( f"SELECT * FROM {identifier(table_name)} " "WHERE last_updated >= :latest;", latest={datetime.strftime(latest_timestamp, "%H-%m-%d %H:%M:%S")}, ) - + print('rows', rows) # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" @@ -183,6 +184,6 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): else: load_status["no change"].append(table_name) logger.info( - f"No new data in {table_name} name. Latest data retrieved is from {latest_timestamp}." + f"No new data" ) return load_status -- cgit v1.2.3 From 982b8fa318c9065bd9037d14c56abcd126252978 Mon Sep 17 00:00:00 2001 From: Ellie Date: Mon, 19 Aug 2024 16:33:26 +0100 Subject: add working process and upload tables test --- src/extract_lambda.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 533bf82..5a5a631 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -150,8 +150,8 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): print(tables) table_name = table[0] rows = db.run( - f"SELECT * FROM {identifier(table_name)} " "WHERE last_updated >= :latest;", - latest={datetime.strftime(latest_timestamp, "%H-%m-%d %H:%M:%S")}, + f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", + latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) print('rows', rows) # Creating a temporary file path and writing the column name to it followed by each row of data -- cgit v1.2.3 From a42d030fb663ad7eb040498cfc5f0627a27d6cc6 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Mon, 19 Aug 2024 16:11:44 +0000 Subject: style: format code with Autopep8, Black and Ruff Formatter This commit fixes the style issues introduced in 4f629e5 according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/65 --- src/extract_lambda.py | 8 +++----- tests/test_extract_lambda.py | 34 ++++++++++++++++++++-------------- 2 files changed, 23 insertions(+), 19 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 5a5a631..9b17ef2 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -151,9 +151,9 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): table_name = table[0] rows = db.run( f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", - latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, + latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) - print('rows', rows) + print("rows", rows) # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" @@ -183,7 +183,5 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): logger.error(f"Error uploading to S3: {e}") else: load_status["no change"].append(table_name) - logger.info( - f"No new data" - ) + logger.info(f"No new data") return load_status diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py index 3405743..5a1c5b2 100644 --- a/tests/test_extract_lambda.py +++ b/tests/test_extract_lambda.py @@ -12,7 +12,7 @@ from src.extract_lambda import ( DBConnectionException, lambda_handler, process_and_upload_tables, - retrieve_secrets + retrieve_secrets, ) @@ -25,7 +25,9 @@ def mock_config(): "password": "password", "database": "db", } - with patch("src.extract_lambda.retrieve_secrets", return_value=env_vars) as mock_config: + with patch( + "src.extract_lambda.retrieve_secrets", return_value=env_vars + ) as mock_config: yield mock_config @@ -185,31 +187,35 @@ class TestProcessAndUploadTables: queries = [ "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';", "SELECT * FROM Fruits WHERE last_updated > :latest;", - "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = 'Fruits';" + "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = 'Fruits';", ] return_values = [ - [['Fruits']], + [["Fruits"]], [], # No new rows with a more recent last_updated timestamp - [['Food_type'], ['Flavour'], ['Colour'], ['last_updated']] + [["Food_type"], ["Flavour"], ["Colour"], ["last_updated"]], ] vals = dict(zip(queries, return_values)) # Patch the database connection and set return values for queries - with patch('src.extract_lambda.Connection') as mock_db: + with patch("src.extract_lambda.Connection") as mock_db: mock_db().run.side_effect = return_values - s3_key = 'Fruits/2024/08/15/Fruits_16:46:30.csv' + s3_key = "Fruits/2024/08/15/Fruits_16:46:30.csv" existing_files = { - s3_key: 'Food_type,Flavour,Colour,last_updated\nVegetable,Sour,Green,2022-11-03 14:20:49.962\nBerry,Sweet,Red,2022-11-03 14:20:49.962' + s3_key: "Food_type,Flavour,Colour,last_updated\nVegetable,Sour,Green,2022-11-03 14:20:49.962\nBerry,Sweet,Red,2022-11-03 14:20:49.962" } # Simulate S3 bucket and file setup - s3_client.create_bucket(Bucket='test_extract_bucket', - CreateBucketConfiguration={'LocationConstraint': 'eu-west-2'}) - s3_client.upload_file('tests/dummy_identical.csv', 'test_extract_bucket', s3_key) - + s3_client.create_bucket( + Bucket="test_extract_bucket", + CreateBucketConfiguration={"LocationConstraint": "eu-west-2"}, + ) + s3_client.upload_file( + "tests/dummy_identical.csv", "test_extract_bucket", s3_key + ) + # Run the process_and_upload_tables function process_and_upload_tables(mock_db(), existing_files, client=s3_client) # Assert that the log contains "No new data" - assert 'No new data' in caplog.text + assert "No new data" in caplog.text - # process and upload tables needs more tests \ No newline at end of file + # process and upload tables needs more tests -- cgit v1.2.3 From 88e71818aaf1bf67e4d2807d22d8122b7bf184f1 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 22:20:21 +0100 Subject: refactor(log): implement logging ancestry - avoid using root logger --- src/extract_lambda.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 15fe785..6f841b4 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -8,7 +8,7 @@ from datetime import datetime import re -logger = logging.getLogger() +logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # DB Exception class -- cgit v1.2.3 From 84b3dea3833ae65d53a1007567ee19c31bf34ee3 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 22:28:31 +0100 Subject: refactor(retrieve_secrets): use aws recommended method for retrieving secrets --- src/extract_lambda.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 6f841b4..1df4c34 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -55,18 +55,21 @@ def lambda_handler(event, context): db.close() -def retrieve_secrets( - sm_client=boto3.client("secretsmanager"), secret_name="bentley-secrets" -): +def retrieve_secrets(): + secret_name = "bentley-secrets" + region_name = "eu-west-2" + + # Create a Secrets Manager client + session = boto3.session.Session() + client = session.client(service_name="secretsmanager", region_name=region_name) + try: - response = sm_client.get_secret_value(SecretId=secret_name) - if "SecretString" in response: - secret = json.loads(response["SecretString"]) - return secret + get_secret_value_response = client.get_secret_value(SecretId=secret_name) except ClientError as e: - logger.error(f"Could not retrieve secrets: {e}") raise e + return get_secret_value_response["SecretString"] + def connect_to_database() -> Connection: try: -- cgit v1.2.3 From 3d4d74aa69db85e3c840b3b73c028f4e9f83d1f7 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 22:29:41 +0100 Subject: refactor(lambda_handler): remove unnecessary else statement --- src/extract_lambda.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 1df4c34..99117a4 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -39,14 +39,13 @@ def lambda_handler(event, context): "statusCode": 200, "body": json.dumps("No changes detected, no CSV files were uploaded."), } - else: - return { - "statusCode": 200, - "body": json.dumps( - f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{ - 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}""" - ), - } + return { + "statusCode": 200, + "body": json.dumps( + f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{ + 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}""" + ), + } except Exception as e: logger.error(f"Error: {e}") return {"statusCode": 500, "body": json.dumps("Internal server error.")} -- cgit v1.2.3 From 4699b3506307cb8556a7cc5f12fbe4df7a5c9a6b Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 22:31:58 +0100 Subject: refactor(retrieve_secrets): improve error handling when retrieving secrets --- src/extract_lambda.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 99117a4..63a80ce 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -66,6 +66,9 @@ def retrieve_secrets(): get_secret_value_response = client.get_secret_value(SecretId=secret_name) except ClientError as e: raise e + except KeyError: + logger.error(f"Secret {secret_name} does not contain a SecretString") + raise ValueError(f"Secret {secret_name} does not contain a SecretString") return get_secret_value_response["SecretString"] -- cgit v1.2.3 From 8353621c862e75d1573ff8338852aa7d54d5d2e8 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 22:36:37 +0100 Subject: refactor(retrieve_secrets): add logging for ClientError --- src/extract_lambda.py | 1 + 1 file changed, 1 insertion(+) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 63a80ce..485c021 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -65,6 +65,7 @@ def retrieve_secrets(): try: get_secret_value_response = client.get_secret_value(SecretId=secret_name) except ClientError as e: + logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}") raise e except KeyError: logger.error(f"Secret {secret_name} does not contain a SecretString") -- cgit v1.2.3 From bcbadd508dbc1a53864e64cb1e2eccce53daa187 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 22:37:43 +0100 Subject: chore: reorganise imports in extract_lambda --- src/extract_lambda.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 485c021..8353481 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -1,12 +1,12 @@ -from pg8000.native import Connection, InterfaceError, identifier -import boto3 import csv -from botocore.exceptions import ClientError -import logging import json -from datetime import datetime +import logging import re +from datetime import datetime +import boto3 +from botocore.exceptions import ClientError +from pg8000.native import Connection, InterfaceError, identifier logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -- cgit v1.2.3 From caed81dc699b9b4105da2b8924310f1a370217c7 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 23:13:39 +0100 Subject: refactor: add timestamp function in extract_lambda.py --- src/extract_lambda.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 8353481..ad3c970 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -129,6 +129,16 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 return existing_files +def get_latest_timestamp(existing_files): + all_datetimes = [] + for file_name in existing_files.keys(): + match = re.search(r"\/(.+/).+_(.+)\.csv", file_name) + if match: + datetime_str = "".join(match.group(1, 2)) + all_datetimes.append(datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")) + return max(all_datetimes) if all_datetimes else datetime.min + + def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """Creates a list of the tables from a database query and then selects everything from each table in individual queries @@ -137,22 +147,17 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): to files, or new tables/files it uploads them to the s3 bucket """ load_status = {"updated": [], "no change": []} - # Retrieving the latest file timestamp from S3 extract bucket - all_datetimes = [] - for file_names in existing_files.keys(): - datetime_str_on_s3 = "".join( - re.search(r"\/(.+/).+_(.+)\.csv", file_names).group(1, 2) - ) - all_datetimes.append(datetime.strptime(datetime_str_on_s3, "%Y/%m/%d/%H:%M:%S")) - latest_timestamp = max(all_datetimes) + latest_timestamp = get_latest_timestamp(existing_files) - # Iterating through tables on the database and retrieving only latest changes vs previous file load tables = db.run( """ - SELECT table_name - FROM information_schema.tables - WHERE table_schema='public' AND table_type='BASE TABLE';""" + SELECT table_name + FROM information_schema.tables + WHERE table_schema='public' + AND table_type='BASE TABLE'; + """ ) + for table in tables: print(tables) table_name = table[0] -- cgit v1.2.3 From 610d23e7ed0f39e5ecb0dd25c3a1e3cba20d662e Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 23:26:58 +0100 Subject: refactor: remove print statements in process_and_upload_tables --- src/extract_lambda.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index ad3c970..7c6c3d1 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -159,13 +159,11 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): ) for table in tables: - print(tables) table_name = table[0] rows = db.run( f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) - print("rows", rows) # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" -- cgit v1.2.3 From 5be3b130170c82360ff9715f5c09b9e815fc16f4 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 23:32:25 +0100 Subject: feat: use buffers for s3 upload instead of csv files --- src/extract_lambda.py | 50 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 17 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 7c6c3d1..f38e24a 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -3,6 +3,7 @@ import json import logging import re from datetime import datetime +from io import StringIO import boto3 from botocore.exceptions import ClientError @@ -139,6 +140,26 @@ def get_latest_timestamp(existing_files): return max(all_datetimes) if all_datetimes else datetime.min +def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key): + csv_buffer = StringIO() + csv_writer = csv.writer(csv_buffer) + + csv_writer.writerow(column_names) + + for row in rows: + csv_writer.writerow(row) + + if csv_buffer.tell() > 5 * 1024 * 1024: + csv_buffer.seek(0) + s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) + csv_buffer.truncate(0) + csv_buffer.seek(0) + + if csv_buffer.tell() > 0: + csv_buffer.seek(0) + s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) + + def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """Creates a list of the tables from a database query and then selects everything from each table in individual queries @@ -164,29 +185,24 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) - # Creating a temporary file path and writing the column name to it followed by each row of data if rows: - csv_file_path = f"/tmp/{table_name}.csv" - with open(csv_file_path, "w", newline="") as file: - writer = csv.writer(file) - # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] - column_names = [ - col_name[0] - for col_name in db.run( - """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS - WHERE table_name = :table ;""", - table=table_name, - ) - ] - writer.writerow(column_names) - writer.writerows(rows) + column_names = [ + col_name[0] + for col_name in db.run( + """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = :table ;""", + table=table_name, + ) + ] + s3_key = datetime.strftime( datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" ) - # Writing the new file to S3 extract bucket: try: - client.upload_file(csv_file_path, extract_bucket(), s3_key) + stream_to_s3( + table_name, rows, column_names, client, extract_bucket(), s3_key + ) load_status["updated"].append(table_name) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: -- cgit v1.2.3 From 3e80acb28eeeb0eaff97c2363124a8c6e95bcb13 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 23:44:52 +0100 Subject: refactor: optimise s3 streaming & file naming --- src/extract_lambda.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index f38e24a..8575b08 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -149,15 +149,9 @@ def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key) for row in rows: csv_writer.writerow(row) - if csv_buffer.tell() > 5 * 1024 * 1024: - csv_buffer.seek(0) - s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) - csv_buffer.truncate(0) - csv_buffer.seek(0) + csv_buffer.seek(0) - if csv_buffer.tell() > 0: - csv_buffer.seek(0) - s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) + s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): @@ -190,13 +184,14 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): col_name[0] for col_name in db.run( """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS - WHERE table_name = :table ;""", + WHERE table_name = :table ;""", table=table_name, ) ] - s3_key = datetime.strftime( - datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" + s3_key = ( + f"{table_name}/{datetime.now().strftime('%Y/%m/%d')}/" + f"{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv" ) try: -- cgit v1.2.3 From 640b0685cd795c03b571b3ca26fc9030b86c4f99 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 00:18:16 +0100 Subject: fix(extract_lambda): fix UnboundLocalError when db is called before it is assigned a value --- src/extract_lambda.py | 1 + 1 file changed, 1 insertion(+) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 8575b08..7efaac0 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -29,6 +29,7 @@ def lambda_handler(event, context): and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them it uses 3 helper functions to achieve these 3 functionalities """ + db = None try: db = connect_to_database() existing_files = list_existing_s3_files() -- cgit v1.2.3 From e25bee6c1c9db8edaf3197f0dc48fa3c63e61744 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:01:55 +0100 Subject: feat: revert s3 streaming to previous implementation for uploading --- src/extract_lambda.py | 56 +++++++++++++++++++++++---------------------------- 1 file changed, 25 insertions(+), 31 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 7efaac0..4921034 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -49,7 +49,7 @@ def lambda_handler(event, context): ), } except Exception as e: - logger.error(f"Error: {e}") + logger.error(f"Error: {e}", exc_info=True) return {"statusCode": 500, "body": json.dumps("Internal server error.")} finally: if db: @@ -78,7 +78,7 @@ def retrieve_secrets(): def connect_to_database() -> Connection: try: - secrets = retrieve_secrets() + secrets = json.loads(retrieve_secrets()) host = secrets["host"] port = secrets["port"] user = secrets["user"] @@ -141,20 +141,6 @@ def get_latest_timestamp(existing_files): return max(all_datetimes) if all_datetimes else datetime.min -def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key): - csv_buffer = StringIO() - csv_writer = csv.writer(csv_buffer) - - csv_writer.writerow(column_names) - - for row in rows: - csv_writer.writerow(row) - - csv_buffer.seek(0) - - s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) - - def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """Creates a list of the tables from a database query and then selects everything from each table in individual queries @@ -180,25 +166,29 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) + # Creating a temporary file path and writing the column name to it followed by each row of data if rows: - column_names = [ - col_name[0] - for col_name in db.run( - """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS - WHERE table_name = :table ;""", - table=table_name, - ) - ] - - s3_key = ( - f"{table_name}/{datetime.now().strftime('%Y/%m/%d')}/" - f"{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv" + csv_file_path = f"/tmp/{table_name}.csv" + with open(csv_file_path, "w", newline="") as file: + writer = csv.writer(file) + # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [ + col_name[0] + for col_name in db.run( + """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = :table ;""", + table=table_name, + ) + ] + writer.writerow(column_names) + writer.writerows(rows) + s3_key = datetime.strftime( + datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" ) + # Writing the new file to S3 extract bucket: try: - stream_to_s3( - table_name, rows, column_names, client, extract_bucket(), s3_key - ) + client.upload_file(csv_file_path, extract_bucket(), s3_key) load_status["updated"].append(table_name) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: @@ -207,3 +197,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): load_status["no change"].append(table_name) logger.info(f"No new data") return load_status + + +if __name__ == "__main__": + lambda_handler(None, None) -- cgit v1.2.3 From 5211751b69a894874945e3a916c33781a327ab10 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:26:26 +0100 Subject: feat: conditional logic for if bucket is empty --- src/extract_lambda.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 4921034..6216446 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -124,6 +124,7 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 logger.error(f"Error retrieving S3 object {s3_key}: {e}") else: logger.error("The bucket is empty") + return None except ClientError as e: logger.error(f"Error listing S3 objects: {e}") @@ -132,13 +133,18 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 def get_latest_timestamp(existing_files): - all_datetimes = [] - for file_name in existing_files.keys(): - match = re.search(r"\/(.+/).+_(.+)\.csv", file_name) - if match: - datetime_str = "".join(match.group(1, 2)) - all_datetimes.append(datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")) - return max(all_datetimes) if all_datetimes else datetime.min + if existing_files: + all_datetimes = [] + for file_name in existing_files.keys(): + match = re.search(r"\/(.+/).+_(.+)\.csv", file_name) + if match: + datetime_str = "".join(match.group(1, 2)) + all_datetimes.append( + datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S") + ) + return max(all_datetimes) if all_datetimes else datetime.min + + return existing_files def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): @@ -163,8 +169,16 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): for table in tables: table_name = table[0] rows = db.run( - f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", - latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, + f""" + SELECT * FROM {identifier(table_name)} + WHERE last_updated >= :latest; + """, + latest={ + datetime.strftime( + latest_timestamp if latest_timestamp else datetime(1990, 1, 1), + "%Y-%m-%d %H:%M:%S", + ) + }, ) # Creating a temporary file path and writing the column name to it followed by each row of data if rows: -- cgit v1.2.3 From dc3a7e74ddf549dad05745c64201aaf0d3402213 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:31:25 +0100 Subject: feat: add advanced logging --- src/extract_lambda.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 6216446..9daf662 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -10,8 +10,12 @@ from botocore.exceptions import ClientError from pg8000.native import Connection, InterfaceError, identifier logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - +logging.basicConfig( + format="{asctime} - {levelname} - {message}", + style="{", + datefmt="%Y-%m-%d %H:%M", + level=logging.INFO, +) # DB Exception class @@ -168,11 +172,13 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): for table in tables: table_name = table[0] - rows = db.run( - f""" + base_query = f""" SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest; - """, + """ + logger.info(f"Processing table: {table_name}") + rows = db.run( + base_query, latest={ datetime.strftime( latest_timestamp if latest_timestamp else datetime(1990, 1, 1), -- cgit v1.2.3 From 35397e8bad42a8c507d1fb13007c6da2f947e851 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:44:30 +0100 Subject: feat: add additional logging and exclude unnecessary table --- src/extract_lambda.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 9daf662..fe22192 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -165,7 +165,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """ SELECT table_name FROM information_schema.tables - WHERE table_schema='public' + WHERE table_schema='public' AND table_name != '_prisma_migrations' AND table_type='BASE TABLE'; """ ) @@ -176,16 +176,18 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest; """ - logger.info(f"Processing table: {table_name}") - rows = db.run( - base_query, - latest={ + latest = ( + { datetime.strftime( latest_timestamp if latest_timestamp else datetime(1990, 1, 1), "%Y-%m-%d %H:%M:%S", ) }, ) + logger.info(f"Processing table: {table_name}") + logger.info(f"Latest timestamp: {latest[0]}") + rows = db.run(base_query, latest=latest) + logger.info(f"Rows: {rows}") # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" -- cgit v1.2.3 From be911e22a964bdf7d5a4421cde7d7c6df447ed5c Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:49:59 +0100 Subject: refactor: change rows output to debug logger output --- src/extract_lambda.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index fe22192..e9f438b 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -16,7 +16,6 @@ logging.basicConfig( datefmt="%Y-%m-%d %H:%M", level=logging.INFO, ) -# DB Exception class class DBConnectionException(Exception): @@ -187,7 +186,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): logger.info(f"Processing table: {table_name}") logger.info(f"Latest timestamp: {latest[0]}") rows = db.run(base_query, latest=latest) - logger.info(f"Rows: {rows}") + logger.debug(f"Rows: {rows}") # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" -- cgit v1.2.3 From 2a914add8391f345ee1096b9deb729c05d3e06c3 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 15:15:02 +0100 Subject: feat: add more logging for debugging --- src/extract_lambda.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index e9f438b..24f0981 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -10,13 +10,16 @@ from botocore.exceptions import ClientError from pg8000.native import Connection, InterfaceError, identifier logger = logging.getLogger(__name__) + logging.basicConfig( format="{asctime} - {levelname} - {message}", style="{", datefmt="%Y-%m-%d %H:%M", - level=logging.INFO, + level=logging.DEBUG, ) +logging.getLogger("botocore").setLevel(logging.WARNING) + class DBConnectionException(Exception): """Wraps pg8000.native Error or DatabaseError.""" @@ -110,7 +113,7 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 results of listing the contents of the s3 bucket, then returns the populated dictionary """ - + logging.info("Listing existing S3 files") existing_files = {} try: -- cgit v1.2.3