From c8d8d2cee4262782890ea68cda8fc86f61098b09 Mon Sep 17 00:00:00 2001 From: Ang Bel Date: Tue, 13 Aug 2024 10:14:01 +0100 Subject: s3 buckets in tf and initial blank lambda py files set-up in src folder for the next task of lambda tf set up --- src/extract_lambda.py | 0 src/load_lambda.py | 0 src/transform_lambda.py | 0 3 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 src/extract_lambda.py create mode 100644 src/load_lambda.py create mode 100644 src/transform_lambda.py (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py new file mode 100644 index 0000000..e69de29 diff --git a/src/load_lambda.py b/src/load_lambda.py new file mode 100644 index 0000000..e69de29 diff --git a/src/transform_lambda.py b/src/transform_lambda.py new file mode 100644 index 0000000..e69de29 -- cgit v1.2.3 From 90545b9cf36e84d5868bdd17f22471f579f20ec4 Mon Sep 17 00:00:00 2001 From: T-Aji Date: Tue, 13 Aug 2024 11:30:28 +0100 Subject: database connection added to func --- src/extract_lambda.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index e69de29..7d56c66 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -0,0 +1,32 @@ +from pg8000.native import Connection, Error, DatabaseError, InterfaceError +from dotenv import load_dotenv +import os + +load_dotenv() + +def extract(): + +# temporary credentials for dev- will not have access when uploaded + + database = os.getenv('database') + user = os.getenv('user') + password = os.getenv('password') + host = os.getenv('host') + port = os.getenv('port') + + + try: + db = Connection.run( + database=database, + user=user, + password=password, + host=host, + port=port + ) + except DatabaseError as e: + print(e) + except InterfaceError as i: + print(i) + + + \ No newline at end of file -- cgit v1.2.3 From 974a8018f79d8592cbd6a59b1b26a9d288975328 Mon Sep 17 00:00:00 2001 From: T-Aji Date: Tue, 13 Aug 2024 12:30:42 +0100 Subject: dumps data to csv --- src/extract_lambda.py | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 7d56c66..8317ef8 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -1,22 +1,25 @@ from pg8000.native import Connection, Error, DatabaseError, InterfaceError from dotenv import load_dotenv import os +import boto3 +import csv +from botocore.exceptions import ClientError load_dotenv() -def extract(): +def lambda_handler(event, context): + client = boto3.client('s3') # temporary credentials for dev- will not have access when uploaded - + database = os.getenv('database') user = os.getenv('user') password = os.getenv('password') host = os.getenv('host') port = os.getenv('port') - try: - db = Connection.run( + db = Connection( database=database, user=user, password=password, @@ -27,6 +30,25 @@ def extract(): print(e) except InterfaceError as i: print(i) - + #replace prints with upload to cloudwatch logs + + tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';") + for table in tables: + table_name = table[0] + rows = db.run(f"SELECT * FROM {table_name};") + # this saves the csv files to the repo root before writing to s3, this is unnecessary. how will the lambda behave when it attempts to save files? + with open(f"{table_name}.csv", "w", newline='') as file: + writer = csv.writer(file) + writer.writerow([desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]) + writer.writerows(rows) + try: + client.upload_file(file, Bucket='ingestion-bucket', Object_name=table_name) + + except ClientError as e: + print(e) + #replace print with upload to cloudwatch logs + + if db: + db.close() \ No newline at end of file -- cgit v1.2.3 From cdb4577b5ad7ae1f708797de6bbf17e289bfac14 Mon Sep 17 00:00:00 2001 From: T-Aji Date: Tue, 13 Aug 2024 15:32:33 +0100 Subject: feat/ add logging & split task into 3 helper functions --- src/extract_lambda.py | 140 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 109 insertions(+), 31 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 8317ef8..11ea5d1 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -1,54 +1,132 @@ -from pg8000.native import Connection, Error, DatabaseError, InterfaceError +from pg8000.native import Connection, DatabaseError, InterfaceError from dotenv import load_dotenv import os import boto3 import csv from botocore.exceptions import ClientError +import logging +import json +logger = logging.getLogger() +logger.setLevel(logging.INFO) load_dotenv() -def lambda_handler(event, context): - client = boto3.client('s3') -# temporary credentials for dev- will not have access when uploaded +database = os.getenv('database') +user = os.getenv('user') +password = os.getenv('password') +host = os.getenv('host') +port = os.getenv('port') + +def lambda_handler(event, context): + """This lambda function connects to the Totesys database, lists the contents of the ingestion bucket, + and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them + it uses 3 helper functions to achieve these 3 functionalities + """ + try: + db = connect_to_database() + existing_files = list_existing_s3_files() + any_changes = process_and_upload_tables(db, existing_files) + + if not any_changes: + logger.info("No changes detected in the database.") + return { + 'statusCode': 200, + 'body': json.dumps('No changes detected, no CSV files were uploaded.') + } + else: + return { + 'statusCode': 200, + 'body': json.dumps('CSV files processed and uploaded successfully.') + } + + except Exception as e: + logger.error(f'Error: {e}') + return { + 'statusCode': 500, + 'body': json.dumps('Internal server error.') + } - database = os.getenv('database') - user = os.getenv('user') - password = os.getenv('password') - host = os.getenv('host') - port = os.getenv('port') + finally: + + if db: + db.close() +def connect_to_database(): try: - db = Connection( - database=database, - user=user, - password=password, - host=host, - port=port + return Connection( + database=database, + user=user, + password=password, + host=host, + port=port ) except DatabaseError as e: - print(e) + logger.error(f'Database error: {e}') + raise except InterfaceError as i: - print(i) - #replace prints with upload to cloudwatch logs + logger.error(f'Interface error: {i}') + raise + + +def list_existing_s3_files(): + """Creates a dictionary and populates it with the + results of listing the contents of the s3 bucket, then + returns the populated dictionary + """ + client = boto3.client('s3') + existing_files = {} + + try: + response = client.list_objects_v2(Bucket=ingestion_bucket) + + if 'Contents' in response: + for obj in response['Contents']: + s3_key = obj['Key'] + try: + file_obj = client.get_object(Bucket=ingestion_bucket, Key=s3_key) + file_content = file_obj['Body'].read().decode('utf-8') + existing_files[s3_key] = file_content + except ClientError as e: + logger.error(f'Error retrieving S3 object {s3_key}: {e}') + + except ClientError as e: + logger.error(f'Error listing S3 objects: {e}') + + return existing_files + + + +def process_and_upload_tables(db, existing_files): + """Creates a list of the tables from a database query and + then selects everything from each table in individual queries + it then writes each table to CSV files and compares with the item + in the existing_files dictionary with the same name. If it finds sny changes + to files, or new tables/files it uploads them to the s3 bucket + """ + client = boto3.client('s3') tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';") + for table in tables: table_name = table[0] rows = db.run(f"SELECT * FROM {table_name};") - # this saves the csv files to the repo root before writing to s3, this is unnecessary. how will the lambda behave when it attempts to save files? - with open(f"{table_name}.csv", "w", newline='') as file: + + + csv_file_path = f"/tmp/{table_name}.csv" + with open(csv_file_path, "w", newline='') as file: writer = csv.writer(file) - writer.writerow([desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]) + column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + writer.writerow(column_names) writer.writerows(rows) - try: - client.upload_file(file, Bucket='ingestion-bucket', Object_name=table_name) - - except ClientError as e: - print(e) - #replace print with upload to cloudwatch logs - - if db: - db.close() + + s3_key = f"{table_name}/latest.csv" + new_csv_content = open(csv_file_path, "r").read() + - \ No newline at end of file + if s3_key not in existing_files or existing_files[s3_key] != new_csv_content: + try: + client.upload_file(csv_file_path, ingestion_bucket, s3_key) + logger.info(f"Uploaded {s3_key} to S3.") + except ClientError as e: + logger.error(f'Error uploading to S3: {e}') \ No newline at end of file -- cgit v1.2.3 From 4f0d6f287ae83d7cdc0df6988ab7b9de10912f16 Mon Sep 17 00:00:00 2001 From: T-Aji Date: Wed, 14 Aug 2024 12:25:57 +0100 Subject: feat/passing tests to helper function list_existing_s3_files --- .gitignore | 3 +++ src/extract_lambda.py | 12 ++++++----- tests/dummy.txt | 1 + tests/test_extract_lambda.py | 49 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 60 insertions(+), 5 deletions(-) create mode 100644 .gitignore create mode 100644 tests/dummy.txt create mode 100644 tests/test_extract_lambda.py (limited to 'src') diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..428f94e --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +venv +.env +__pycache__/ \ No newline at end of file diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 11ea5d1..dc70590 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -18,6 +18,7 @@ password = os.getenv('password') host = os.getenv('host') port = os.getenv('port') + def lambda_handler(event, context): """This lambda function connects to the Totesys database, lists the contents of the ingestion bucket, and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them @@ -69,27 +70,28 @@ def connect_to_database(): raise - -def list_existing_s3_files(): +def list_existing_s3_files(bucket_name='extract_bucket', client=boto3.client('s3')): """Creates a dictionary and populates it with the results of listing the contents of the s3 bucket, then returns the populated dictionary """ - client = boto3.client('s3') + existing_files = {} try: - response = client.list_objects_v2(Bucket=ingestion_bucket) + response = client.list_objects_v2(Bucket='extract_bucket') if 'Contents' in response: for obj in response['Contents']: s3_key = obj['Key'] try: - file_obj = client.get_object(Bucket=ingestion_bucket, Key=s3_key) + file_obj = client.get_object(Bucket=bucket_name, Key=s3_key) file_content = file_obj['Body'].read().decode('utf-8') existing_files[s3_key] = file_content except ClientError as e: logger.error(f'Error retrieving S3 object {s3_key}: {e}') + else: + logger.error('The bucket is empty') except ClientError as e: logger.error(f'Error listing S3 objects: {e}') diff --git a/tests/dummy.txt b/tests/dummy.txt new file mode 100644 index 0000000..af27ff4 --- /dev/null +++ b/tests/dummy.txt @@ -0,0 +1 @@ +This is a test file. \ No newline at end of file diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py new file mode 100644 index 0000000..472e93a --- /dev/null +++ b/tests/test_extract_lambda.py @@ -0,0 +1,49 @@ +import pytest +import boto3 +from moto import mock_aws +from src.extract_lambda import list_existing_s3_files #process_and_upload_tables +import os +import logging + + +@pytest.fixture(scope='class') +def aws_credentials(): + os.environ["AWS_ACCESS_KEY_ID"] = 'testing' + os.environ["AWS_SECRET_ACCESS_KEY"] = 'testing' + os.environ["AWS_SECURIT_TOKEN"] = 'testing' + os.environ["AWS_SESSION_TOKEN"] = 'testing' + os.environ["AWS_DEFAULT_REGION"]= 'eu-west-2' + +@pytest.fixture(scope='class') +def s3_client(aws_credentials): + with mock_aws(): + yield boto3.client('s3') + +class TestListExistings3Files(): + def test_error_if_no_bucket(self, s3_client, caplog): + + logger = logging.getLogger() + logger.info('Testing now.') + caplog.set_level(logging.ERROR) + list_existing_s3_files(client=s3_client) + assert 'Error listing S3 objects' in caplog.text + + def test_error_if_bucket_is_empty(self, s3_client, caplog): + + s3_client.create_bucket(Bucket='extract_bucket', + CreateBucketConfiguration={ + 'LocationConstraint': 'eu-west-2' + }) + list_existing_s3_files(client=s3_client) + assert 'The bucket is empty' in caplog.text + + def test_error_retrieving_object(self, s3_client, caplog): + s3_client.upload_file('tests/dummy.txt', 'extract_bucket', 'dummy.txt') + list_existing_s3_files(bucket_name='test_bucket', client=s3_client) + + assert 'Error retrieving S3 object ' in caplog.text + + def test_retrieves_file_content(self, s3_client, caplog): + result = list_existing_s3_files(client=s3_client) + + assert list(result.values()) == ['This is a test file.'] \ No newline at end of file -- cgit v1.2.3 From 101e1e24cb38b6a45661b723881e2b2d6dd2fb07 Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Wed, 14 Aug 2024 14:35:05 +0100 Subject: wip: terraform debugging --- .gitignore | 5 ++++- src/load_lambda.py | 2 ++ src/transform_lambda.py | 2 ++ terraform/events.tf | 18 ++++++++++-------- terraform/s3.tf | 34 +++++++++++++++++----------------- 5 files changed, 35 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/.gitignore b/.gitignore index 239c7e0..d759665 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,7 @@ *.tfvars *.tfvars.json .terraform.tfstate.lock.info -*.zip \ No newline at end of file +*.zip +.terraform/ +.terraform* +log* \ No newline at end of file diff --git a/src/load_lambda.py b/src/load_lambda.py index e69de29..6ee681f 100644 --- a/src/load_lambda.py +++ b/src/load_lambda.py @@ -0,0 +1,2 @@ +def lambda_handler(): + pass \ No newline at end of file diff --git a/src/transform_lambda.py b/src/transform_lambda.py index e69de29..6ee681f 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -0,0 +1,2 @@ +def lambda_handler(): + pass \ No newline at end of file diff --git a/terraform/events.tf b/terraform/events.tf index 6744085..9fd89e4 100644 --- a/terraform/events.tf +++ b/terraform/events.tf @@ -1,7 +1,17 @@ +resource "aws_cloudwatch_event_target" "extract_lambda_cw_event" { + rule = aws_cloudwatch_event_rule.lambda_trigger.name + target_id = "TargetFunctionV1" + arn = aws_lambda_function.extract_lambda.arn #replaced lambda name placeholder + force_destroy = true +} + resource "aws_cloudwatch_event_rule" "lambda_trigger" { name = "lambda-scheduled-trigger" description = "Schedule to trigger the Lambda function" schedule_expression = "rate(30 minutes)" + force_destroy = true + # depends_on = [ + # aws_cloudwatch_event_target.extract_lambda_cw_event] # event_pattern = jsonencode({ # detail-type = @@ -10,14 +20,6 @@ resource "aws_cloudwatch_event_rule" "lambda_trigger" { # }) } - -resource "aws_cloudwatch_event_target" "extract_lambda_cw_event" { - rule = aws_cloudwatch_event_rule.lambda_trigger.name - target_id = "TargetFunctionV1" - arn = aws_lambda_function.extract_lambda.arn #replaced lambda name placeholder -} - - resource "aws_lambda_permission" "allow_eventbridge" { statement_id = "AllowExecutionFromEventBridge" action = "lambda:InvokeFunction" diff --git a/terraform/s3.tf b/terraform/s3.tf index 8ab5622..4c06b8e 100644 --- a/terraform/s3.tf +++ b/terraform/s3.tf @@ -32,20 +32,20 @@ resource "aws_s3_bucket" "lambda_code_bucket" { bucket_prefix = "${var.s3_code_bucket_name}-" } -resource "aws_s3_object" "extract_lambda_code" { - bucket = aws_s3_bucket.lambda_code_bucket.bucket - key = "${var.extract_lambda_name}/extract_function.zip" - source = "${path.module}/../extract_function.zip" -} # << can't figure out how this is being used but we seem to need it - -resource "aws_s3_object" "transform_lambda_code" { - bucket = aws_s3_bucket.lambda_code_bucket.bucket - key = "${var.transform_lambda_name}/transform_function.zip" - source = "${path.module}/../transform_function.zip" -} # << can't figure out how this is being used but we seem to need it - -resource "aws_s3_object" "load_lambda_code" { - bucket = aws_s3_bucket.lambda_code_bucket.bucket - key = "${var.load_lambda_name}/load_function.zip" - source = "${path.module}/../load_function.zip" -} \ No newline at end of file +# resource "aws_s3_object" "extract_lambda_code" { +# bucket = aws_s3_bucket.lambda_code_bucket.bucket +# key = "${var.extract_lambda_name}/extract_function.zip" +# source = "${path.module}/../extract_function.zip" +# } # << can't figure out how this is being used but we seem to need it + +# resource "aws_s3_object" "transform_lambda_code" { +# bucket = aws_s3_bucket.lambda_code_bucket.bucket +# key = "${var.transform_lambda_name}/transform_function.zip" +# source = "${path.module}/../transform_function.zip" +# } # << can't figure out how this is being used but we seem to need it + +# resource "aws_s3_object" "load_lambda_code" { +# bucket = aws_s3_bucket.lambda_code_bucket.bucket +# key = "${var.load_lambda_name}/load_function.zip" +# source = "${path.module}/../load_function.zip" +# } \ No newline at end of file -- cgit v1.2.3 From 45e025ac0c4ae8c721cb0b875fd0abd67cc2bc07 Mon Sep 17 00:00:00 2001 From: T-Aji Date: Wed, 14 Aug 2024 15:53:11 +0100 Subject: test: passing test for function connect_to_database --- src/extract_lambda.py | 40 +++++++++++++++++++++++++--------------- tests/test_extract_lambda.py | 40 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 62 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index dc70590..6e94bba 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -1,6 +1,5 @@ from pg8000.native import Connection, DatabaseError, InterfaceError -from dotenv import load_dotenv -import os +from dotenv import dotenv_values import boto3 import csv from botocore.exceptions import ClientError @@ -9,16 +8,15 @@ import json logger = logging.getLogger() logger.setLevel(logging.INFO) -load_dotenv() - - -database = os.getenv('database') -user = os.getenv('user') -password = os.getenv('password') -host = os.getenv('host') -port = os.getenv('port') +class DBConnectionException(Exception): + """Wraps pg8000.native Error or DatabaseError.""" + def __init__(self, e): + """Initialise with provided error message.""" + self.message = str(e) + super().__init__(self.message) + def lambda_handler(event, context): """This lambda function connects to the Totesys database, lists the contents of the ingestion bucket, and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them @@ -53,8 +51,19 @@ def lambda_handler(event, context): if db: db.close() -def connect_to_database(): +def get_config(path: str = ".env") -> dict: + return dotenv_values(path) + + +def connect_to_database() -> Connection: try: + config = get_config() + host = config["host"] + port = config["port"] + user = config["user"] + password = config["password"] + database = config["database"] + return Connection( database=database, user=user, @@ -62,12 +71,13 @@ def connect_to_database(): host=host, port=port ) - except DatabaseError as e: - logger.error(f'Database error: {e}') - raise + # except DatabaseError as e: + # logger.error(f'Database error: {e}') + # raise except InterfaceError as i: logger.error(f'Interface error: {i}') - raise + raise DBConnectionException("Failed to connect to database") + def list_existing_s3_files(bucket_name='extract_bucket', client=boto3.client('s3')): diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py index 472e93a..18c49fc 100644 --- a/tests/test_extract_lambda.py +++ b/tests/test_extract_lambda.py @@ -1,10 +1,24 @@ import pytest import boto3 from moto import mock_aws -from src.extract_lambda import list_existing_s3_files #process_and_upload_tables +from unittest.mock import patch +from unittest import TestCase +from src.extract_lambda import list_existing_s3_files, connect_to_database, DBConnectionException #process_and_upload_tables import os import logging +@pytest.fixture(scope='class') +def mock_config(): + env_vars = { + "host": "abc", + "port": "5432", + "user": "def", + "password": "password", + "database": "db", + } + with patch("src.extract_lambda.get_config", return_value=env_vars) as mock_config: + yield mock_config + @pytest.fixture(scope='class') def aws_credentials(): @@ -19,7 +33,7 @@ def s3_client(aws_credentials): with mock_aws(): yield boto3.client('s3') -class TestListExistings3Files(): +class TestListExistings3Files: def test_error_if_no_bucket(self, s3_client, caplog): logger = logging.getLogger() @@ -46,4 +60,24 @@ class TestListExistings3Files(): def test_retrieves_file_content(self, s3_client, caplog): result = list_existing_s3_files(client=s3_client) - assert list(result.values()) == ['This is a test file.'] \ No newline at end of file + assert list(result.values()) == ['This is a test file.'] + +class TestConnectToDatabase: + def test_connect_to_database(mock_conn, mock_config): + with patch("src.extract_lambda.Connection", autospec=True) as mock_conn: + connect_to_database() + mock_conn.assert_called_with( + host="abc", user="def", port="5432", password="password", database="db" + ) + + def test_database_error(self, mock_config): + with pytest.raises(DBConnectionException): + connect_to_database() + + def test_logs_interface_error(self, caplog): + logger = logging.getLogger() + logger.info('Testing now.') + caplog.set_level(logging.ERROR) + with pytest.raises(DBConnectionException): + connect_to_database() + assert 'Interface error' in caplog.text \ No newline at end of file -- cgit v1.2.3 From 848a86b7f3b9c5ce16cd774d19e3fa62ca8ffc68 Mon Sep 17 00:00:00 2001 From: T-Aji Date: Wed, 14 Aug 2024 18:14:01 +0100 Subject: test: mid-through test for process_and_upload_tables --- src/extract_lambda.py | 16 +++++++--------- tests/test_extract_lambda.py | 35 ++++++++++++++++++++++++++++++++--- 2 files changed, 39 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 6e94bba..a70ecdd 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -5,6 +5,7 @@ import csv from botocore.exceptions import ClientError import logging import json +from datetime import datetime logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -16,7 +17,7 @@ class DBConnectionException(Exception): """Initialise with provided error message.""" self.message = str(e) super().__init__(self.message) - + def lambda_handler(event, context): """This lambda function connects to the Totesys database, lists the contents of the ingestion bucket, and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them @@ -71,9 +72,6 @@ def connect_to_database() -> Connection: host=host, port=port ) - # except DatabaseError as e: - # logger.error(f'Database error: {e}') - # raise except InterfaceError as i: logger.error(f'Interface error: {i}') raise DBConnectionException("Failed to connect to database") @@ -110,14 +108,14 @@ def list_existing_s3_files(bucket_name='extract_bucket', client=boto3.client('s3 -def process_and_upload_tables(db, existing_files): +def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): """Creates a list of the tables from a database query and then selects everything from each table in individual queries it then writes each table to CSV files and compares with the item - in the existing_files dictionary with the same name. If it finds sny changes + in the existing_files dictionary with the same name. If it finds any changes to files, or new tables/files it uploads them to the s3 bucket """ - client = boto3.client('s3') + tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';") for table in tables: @@ -132,13 +130,13 @@ def process_and_upload_tables(db, existing_files): writer.writerow(column_names) writer.writerows(rows) - s3_key = f"{table_name}/latest.csv" + s3_key = f"{table_name}/{datetime.today().year}/{datetime.today().month}/{datetime.today().day}/{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv" new_csv_content = open(csv_file_path, "r").read() if s3_key not in existing_files or existing_files[s3_key] != new_csv_content: try: - client.upload_file(csv_file_path, ingestion_bucket, s3_key) + client.upload_file(csv_file_path, 'extract_bucket', s3_key) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: logger.error(f'Error uploading to S3: {e}') \ No newline at end of file diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py index 18c49fc..74d7e2c 100644 --- a/tests/test_extract_lambda.py +++ b/tests/test_extract_lambda.py @@ -3,7 +3,7 @@ import boto3 from moto import mock_aws from unittest.mock import patch from unittest import TestCase -from src.extract_lambda import list_existing_s3_files, connect_to_database, DBConnectionException #process_and_upload_tables +from src.extract_lambda import list_existing_s3_files, connect_to_database, DBConnectionException, process_and_upload_tables import os import logging @@ -33,7 +33,7 @@ def s3_client(aws_credentials): with mock_aws(): yield boto3.client('s3') -class TestListExistings3Files: +class TestListExistingS3Files: def test_error_if_no_bucket(self, s3_client, caplog): logger = logging.getLogger() @@ -80,4 +80,33 @@ class TestConnectToDatabase: caplog.set_level(logging.ERROR) with pytest.raises(DBConnectionException): connect_to_database() - assert 'Interface error' in caplog.text \ No newline at end of file + assert 'Interface error' in caplog.text + +class TestProcessAndUploadTables: + def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog, mocker): + logger = logging.getLogger() + logger.info('Testing now.') + caplog.set_level(logging.ERROR) + + with patch("src.extract_lambda.Connection", autospec=True) as mock_conn: + mock_db = connect_to_database() + # need to add a table + s3_key = 'dummy/2024/8/14/dummy_16:46:30.txt' + mock_existing_files = mocker.Mock(return_value={s3_key: 'This is a test file.' }) + s3_client.create_bucket(Bucket='extract_bucket', + CreateBucketConfiguration={ + 'LocationConstraint': 'eu-west-2' + }) + s3_client.upload_file('tests/dummy.txt', 'extract_bucket', s3_key) + process_and_upload_tables(mock_db, mock_existing_files, client=s3_client) + + assert 'Error uploading to S3' in caplog.text + +#@pytest.mark.describe("Helpers") +# @pytest.mark.it("Query processor returns correctly formatted dict") +# def test_process_query(): +# with patch("src.api.helpers.get_db_connection") as mock_conn: +# mock_conn().run.side_effect = db_data +# mock_conn().columns = sample_headers +# result = process_query("test query") +# assert result == sample_result \ No newline at end of file -- cgit v1.2.3 From fe548561acc5e133e3bee4026aab85db2e511bcd Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Thu, 15 Aug 2024 13:51:53 +0100 Subject: wip: secrets manager pushing to merge with extract_lambda --- .gitignore | 1 + src/extract_lambda.py | 1 + src/secrets_manager.py | 48 ++++++++++++++++++++++++++++++++++++++++++++ test/test_secrets_manager.py | 34 +++++++++++++++++++++++++++++++ 4 files changed, 84 insertions(+) create mode 100644 src/secrets_manager.py create mode 100644 test/test_secrets_manager.py (limited to 'src') diff --git a/.gitignore b/.gitignore index d1df545..d164c3f 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ .terraform* log* .DS_Store +venv \ No newline at end of file diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 7d56c66..faa1d30 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -2,6 +2,7 @@ from pg8000.native import Connection, Error, DatabaseError, InterfaceError from dotenv import load_dotenv import os + load_dotenv() def extract(): diff --git a/src/secrets_manager.py b/src/secrets_manager.py new file mode 100644 index 0000000..c0fb61e --- /dev/null +++ b/src/secrets_manager.py @@ -0,0 +1,48 @@ +import boto3 +from botocore.exceptions import ClientError +import json + + +def sm_client(): + sm_client = boto3.client('secretsmanager') + yield sm_client + +def create_secret(sm_client, secret_name, cohort_id, user, password, host, database, port): + secret = { + "cohort_id": cohort_id, + "user": user, + "password": password, + "host": host, + "database": database, + "port": port + } + + response = sm_client.create_secret( + Name = secret_name, + SecretString = json.dumps(secret) + ) + + print(response) + return response + +def list_secret(sm_client): + response = sm_client.list_secrets() + secret_dict = response['SecretList'] + secret_names = [] + for items in secret_dict: + secret_names.append(items['Name']) + print(f'{len(secret_names)} secret(s) available') + for name in secret_names: + print(name) + return secret_names + +def retrieve_secrets(sm_client): + response = sm_client.get_secrets( + + ) + + + +#retrieve secret +#so lambda can access totesy db +#so lambda connect to the db and then retrieve the data \ No newline at end of file diff --git a/test/test_secrets_manager.py b/test/test_secrets_manager.py new file mode 100644 index 0000000..86533bc --- /dev/null +++ b/test/test_secrets_manager.py @@ -0,0 +1,34 @@ +from src.secrets_manager import sm_client, create_secret, list_secret +import boto3 +from moto import mock_aws +import json +import pytest +import os + +pytest.fixture(scope='class') +def mock_aws_credentials(): + """Mocked AWS Credentials for moto.""" + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" + os.environ["AWS_SECURITY_TOKEN"] = "testing" + os.environ["AWS_SESSION_TOKEN"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "eu-west-2" + +@pytest.fixture(scope='class') +def mock_sm_client(mock_aws_credentials): + with mock_aws(): + yield boto3.client('secretsmanager') + + +def test_create_secret_stores_secrets(mock_sm_client): + cohort_id = "test_cohort_id" + user = "test_user_id" + password = "test_password" + host = "test_host" + database = "test_database" + port = "test_port" + + secret_name = "test_secret" + response = create_secret(mock_sm_client, secret_name, cohort_id, user, password, host, database, port) + + assert response['Name'] == secret_name \ No newline at end of file -- cgit v1.2.3 From c9bf342c8f6038a3f5397bfc8c53d251f27e7eec Mon Sep 17 00:00:00 2001 From: Ang Bel Date: Thu, 15 Aug 2024 16:45:47 +0100 Subject: procefss_and_upload_tables test in progress --- requirements.txt | 30 ++++++++++++++++++++++++++++ src/extract_lambda.py | 30 +++++++++++++++++++--------- tests/dummy_identical.csv | 4 ++++ tests/test_extract_lambda.py | 47 +++++++++++++++++++++----------------------- 4 files changed, 77 insertions(+), 34 deletions(-) create mode 100644 requirements.txt create mode 100644 tests/dummy_identical.csv (limited to 'src') diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6f383f9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,30 @@ +asn1crypto==1.5.1 +boto3==1.34.159 +botocore==1.34.159 +certifi==2024.7.4 +cffi==1.17.0 +charset-normalizer==3.3.2 +cryptography==43.0.0 +idna==3.7 +iniconfig==2.0.0 +Jinja2==3.1.4 +jmespath==1.0.1 +MarkupSafe==2.1.5 +moto==5.0.12 +packaging==24.1 +pg8000==1.31.2 +pluggy==1.5.0 +pycparser==2.22 +pytest==8.3.2 +pytest-mock==3.14.0 +python-dateutil==2.9.0.post0 +python-dotenv==1.0.1 +PyYAML==6.0.2 +requests==2.32.3 +responses==0.25.3 +s3transfer==0.10.2 +scramp==1.4.5 +six==1.16.0 +urllib3==2.2.2 +Werkzeug==3.0.3 +xmltodict==0.13.0 \ No newline at end of file diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 56b47a6..fb2d7e8 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -6,6 +6,7 @@ from botocore.exceptions import ClientError import logging import json from datetime import datetime +import re logger = logging.getLogger() @@ -117,9 +118,16 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): in the existing_files dictionary with the same name. If it finds any changes to files, or new tables/files it uploads them to the s3 bucket """ - + ## NEW CODE + all_datetimes = [] + for file_names in existing_files.keys(): + datetime_str_on_s3 = ''.join(re.search(r'\/(.+/).+_(.+)\.csv',file_names).group(1,2)) + all_datetimes.append(datetime.strptime(datetime_str_on_s3, '%Y/%m/%d/%H:%M:%S')) + latest_timestamp = max(all_datetimes) + ## END OF NEW CODE + tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';") - + print(tables) for table in tables: table_name = table[0] rows = db.run(f"SELECT * FROM {table_name};") @@ -128,17 +136,21 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): csv_file_path = f"/tmp/{table_name}.csv" with open(csv_file_path, "w", newline='') as file: writer = csv.writer(file) - column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")] writer.writerow(column_names) writer.writerows(rows) - - s3_key = f"{table_name}/{datetime.today().year}/{datetime.today().month}/{datetime.today().day}/{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv" + s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') new_csv_content = open(csv_file_path, "r").read() - - - if s3_key not in existing_files or existing_files[s3_key] != new_csv_content: + ## NEW CODE + latest_s3_object_key = datetime.strftime(latest_timestamp,f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') + ## END OF NEW CODE + if existing_files[latest_s3_object_key] != new_csv_content: try: client.upload_file(csv_file_path, 'extract_bucket', s3_key) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: - logger.error(f'Error uploading to S3: {e}') \ No newline at end of file + logger.error(f'Error uploading to S3: {e}') + else: + logger.info(f"No new data.") + \ No newline at end of file diff --git a/tests/dummy_identical.csv b/tests/dummy_identical.csv new file mode 100644 index 0000000..fdd8993 --- /dev/null +++ b/tests/dummy_identical.csv @@ -0,0 +1,4 @@ +Food_type,Flavour,Colour +Vegetable,Sour,Green +Berry,Sweet,Red + diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py index 74d7e2c..e94a8a4 100644 --- a/tests/test_extract_lambda.py +++ b/tests/test_extract_lambda.py @@ -1,7 +1,7 @@ import pytest import boto3 from moto import mock_aws -from unittest.mock import patch +from unittest.mock import patch, MagicMock from unittest import TestCase from src.extract_lambda import list_existing_s3_files, connect_to_database, DBConnectionException, process_and_upload_tables import os @@ -81,32 +81,29 @@ class TestConnectToDatabase: with pytest.raises(DBConnectionException): connect_to_database() assert 'Interface error' in caplog.text - +''' class TestProcessAndUploadTables: - def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog, mocker): + def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog): logger = logging.getLogger() logger.info('Testing now.') caplog.set_level(logging.ERROR) - - with patch("src.extract_lambda.Connection", autospec=True) as mock_conn: - mock_db = connect_to_database() - # need to add a table - s3_key = 'dummy/2024/8/14/dummy_16:46:30.txt' - mock_existing_files = mocker.Mock(return_value={s3_key: 'This is a test file.' }) + #### + queries = ["SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';", + "SELECT * FROM Fruits;", + "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = 'Fruits'"] + return_values = [[['Fruits']], + [['Vegetable','Sour','Green'],['Berry','Sweet','Red']], + [['Food_type'],['Flavour'],['Colour']]] + vals = dict(zip(queries,return_values)) + + #### + with patch('src.extract_lambda.connect_to_database') as mock_db: + mock_db().run.side_effects = return_values + s3_key = 'Fruits/2024/08/15/Fruits_16:46:30.csv' + existing_files = {s3_key: 'Food_type,Flavour,Colour\nFruit,Sour,Green\nBerry,Sweet,Red'} s3_client.create_bucket(Bucket='extract_bucket', - CreateBucketConfiguration={ - 'LocationConstraint': 'eu-west-2' - }) - s3_client.upload_file('tests/dummy.txt', 'extract_bucket', s3_key) - process_and_upload_tables(mock_db, mock_existing_files, client=s3_client) - - assert 'Error uploading to S3' in caplog.text - -#@pytest.mark.describe("Helpers") -# @pytest.mark.it("Query processor returns correctly formatted dict") -# def test_process_query(): -# with patch("src.api.helpers.get_db_connection") as mock_conn: -# mock_conn().run.side_effect = db_data -# mock_conn().columns = sample_headers -# result = process_query("test query") -# assert result == sample_result \ No newline at end of file + CreateBucketConfiguration={'LocationConstraint': 'eu-west-2'}) + s3_client.upload_file('tests/dummy_identical.csv', 'extract_bucket', s3_key) + process_and_upload_tables(mock_db(), existing_files, client=s3_client) + assert 'No new data.' in caplog.text +''' \ No newline at end of file -- cgit v1.2.3 From 610261fec06ab3b6106465960d6935dd9df85df0 Mon Sep 17 00:00:00 2001 From: Ang Bel Date: Fri, 16 Aug 2024 09:46:53 +0100 Subject: Secrets manager integration into the extract lambda reviewed. --- src/extract_lambda.py | 29 +++++++++-------- tests/test_secrets_manager.py | 73 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 13 deletions(-) create mode 100644 tests/test_secrets_manager.py (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index fb2d7e8..3055f63 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -1,5 +1,4 @@ -from pg8000.native import Connection, DatabaseError, InterfaceError -from dotenv import dotenv_values +from pg8000.native import Connection, InterfaceError import boto3 import csv from botocore.exceptions import ClientError @@ -42,31 +41,35 @@ def lambda_handler(event, context): 'statusCode': 200, 'body': json.dumps('CSV files processed and uploaded successfully.') } - except Exception as e: logger.error(f'Error: {e}') return { 'statusCode': 500, 'body': json.dumps('Internal server error.') } - finally: - if db: db.close() -def get_config(path: str = ".env") -> dict: - return dotenv_values(path) +def retrieve_secrets(sm_client=boto3.client('secretsmanager'), secret_name='bentley-secrets'): + try: + response = sm_client.get_secret_value(SecretId=secret_name) + if 'SecretString' in response: + secret = json.loads(response['SecretString']) + return secret + except ClientError as e: + logger.error(f'Could not retrieve secrets: {e}') + raise e def connect_to_database() -> Connection: try: - config = get_config() - host = config["host"] - port = config["port"] - user = config["user"] - password = config["password"] - database = config["database"] + secrets = retrieve_secrets() + host = secrets["host"] + port = secrets["port"] + user = secrets["user"] + password = secrets["password"] + database = secrets["database"] return Connection( database=database, diff --git a/tests/test_secrets_manager.py b/tests/test_secrets_manager.py new file mode 100644 index 0000000..a30be86 --- /dev/null +++ b/tests/test_secrets_manager.py @@ -0,0 +1,73 @@ +from src.secrets_manager import sm_client, retrieve_secrets +import boto3 +import botocore.exceptions +from moto import mock_aws +import json +import pytest +import os + +@pytest.fixture(scope='function') +def aws_credentials(): + """Mocked AWS Credentials for moto.""" + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" + os.environ["AWS_SECURITY_TOKEN"] = "testing" + os.environ["AWS_SESSION_TOKEN"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "eu-west-2" + +@pytest.fixture(scope='function') +def mock_sm_client(aws_credentials): + with mock_aws(): + yield boto3.client("secretsmanager") + +@pytest.fixture(scope='function') +def mock_store_secret(mock_sm_client): + secret = { + "cohort_id": "test_cohort_id", + "user": "test_user_id", + "password": "test_password", + "host": "test_host", + "database": "test_database", + "port": "test_port" + } + + secret_name = "test_secret" + + response = mock_sm_client.create_secret(Name=secret_name, SecretString=json.dumps(secret)) + + return response + +def test_retrieves_secrets_returns_dictionary(mock_sm_client, mock_store_secret): + secret_name = "test_secret" + + result = retrieve_secrets(mock_sm_client, secret_name) + + assert isinstance(result, dict) + +def test_retrieves_secrets_returns_correct_keys_and_values(mock_sm_client, mock_store_secret): + + secret_name = "test_secret" + + result = retrieve_secrets(mock_sm_client, secret_name) + + assert result["cohort_id"] == "test_cohort_id" + assert result["user"] == "test_user_id" + assert result["password"] == "test_password" + assert result["host"] == "test_host" + assert result["database"] == "test_database" + assert result["port"] == "test_port" + +def test_retrieves_secrets_raises_error_if_secret_name_incorrect_data_type(mock_sm_client): + secret_name = [1, 2, 3] + + + with pytest.raises(botocore.exceptions.ParamValidationError) as error: + retrieve_secrets(mock_sm_client, secret_name) + + +def test_retrieves_secrets_raises_error_if_secret_name_does_not_exist(mock_sm_client, mock_store_secret): + secret_name = 'test_secret_2' + + + with pytest.raises(botocore.exceptions.ClientError) as error: + retrieve_secrets(mock_sm_client, secret_name) \ No newline at end of file -- cgit v1.2.3 From 938ddda10ff2f7d5360ca0a939fa2f16d6beb09d Mon Sep 17 00:00:00 2001 From: Ang Bel Date: Fri, 16 Aug 2024 10:01:06 +0100 Subject: extract bucket name retrieval helper function and replace the bucket name placeholders --- src/extract_lambda.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 3055f63..f4c0c1d 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -82,9 +82,12 @@ def connect_to_database() -> Connection: logger.error(f'Interface error: {i}') raise DBConnectionException("Failed to connect to database") +def extract_bucket(client=boto3.client('s3')): + response = client.list_buckets() + extract_bucket_filter = [bucket['Name'] for bucket in response['Buckets'] if 'extract' in bucket['Name']] + return extract_bucket_filter[0] - -def list_existing_s3_files(bucket_name='extract_bucket', client=boto3.client('s3')): +def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client('s3')): """Creates a dictionary and populates it with the results of listing the contents of the s3 bucket, then returns the populated dictionary @@ -93,7 +96,7 @@ def list_existing_s3_files(bucket_name='extract_bucket', client=boto3.client('s3 existing_files = {} try: - response = client.list_objects_v2(Bucket='extract_bucket') + response = client.list_objects_v2(Bucket=bucket_name) if 'Contents' in response: for obj in response['Contents']: @@ -150,7 +153,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): ## END OF NEW CODE if existing_files[latest_s3_object_key] != new_csv_content: try: - client.upload_file(csv_file_path, 'extract_bucket', s3_key) + client.upload_file(csv_file_path, extract_bucket(), s3_key) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: logger.error(f'Error uploading to S3: {e}') -- cgit v1.2.3 From dd68d948dec97fedfcaa89806523975ad1224c71 Mon Sep 17 00:00:00 2001 From: Ang Bel Date: Fri, 16 Aug 2024 13:48:22 +0100 Subject: refactoring for extract lambda to filter by last updated and if not empty write it s3 --- .gitignore | 2 ++ src/extract_lambda.py | 26 +++++++++++--------------- 2 files changed, 13 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/.gitignore b/.gitignore index ca15434..bceab93 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,5 @@ __pycache__/ # OS-Related Files .DS_Store + +*venv* diff --git a/src/extract_lambda.py b/src/extract_lambda.py index f4c0c1d..e348bef 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -136,9 +136,9 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): print(tables) for table in tables: table_name = table[0] - rows = db.run(f"SELECT * FROM {table_name};") - + rows = db.run(f"SELECT * FROM {table_name} WHERE last_updated >= {datetime.strftime(latest_timestamp,'%H-%m-%d %H:%M:%S')};") + if rows: csv_file_path = f"/tmp/{table_name}.csv" with open(csv_file_path, "w", newline='') as file: writer = csv.writer(file) @@ -147,16 +147,12 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): writer.writerow(column_names) writer.writerows(rows) s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') - new_csv_content = open(csv_file_path, "r").read() - ## NEW CODE - latest_s3_object_key = datetime.strftime(latest_timestamp,f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') - ## END OF NEW CODE - if existing_files[latest_s3_object_key] != new_csv_content: - try: - client.upload_file(csv_file_path, extract_bucket(), s3_key) - logger.info(f"Uploaded {s3_key} to S3.") - except ClientError as e: - logger.error(f'Error uploading to S3: {e}') - else: - logger.info(f"No new data.") - \ No newline at end of file + + try: + client.upload_file(csv_file_path, extract_bucket(), s3_key) + logger.info(f"Uploaded {s3_key} to S3.") + except ClientError as e: + logger.error(f'Error uploading to S3: {e}') + else: + logger.info(f"No new data.") + \ No newline at end of file -- cgit v1.2.3 From c284df39ed7735d736f4fe0f2571ba846b8f6315 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Fri, 16 Aug 2024 12:51:02 +0000 Subject: style: format code with Autopep8, Black and Ruff Formatter This commit fixes the style issues introduced in dd68d94 according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/47 --- src/extract_lambda.py | 130 +++++++++++++++++++++++++++----------------------- 1 file changed, 71 insertions(+), 59 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index e348bef..323d04a 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -20,48 +20,49 @@ class DBConnectionException(Exception): self.message = str(e) super().__init__(self.message) + def lambda_handler(event, context): """This lambda function connects to the Totesys database, lists the contents of the ingestion bucket, - and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them - it uses 3 helper functions to achieve these 3 functionalities + and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them + it uses 3 helper functions to achieve these 3 functionalities """ try: db = connect_to_database() existing_files = list_existing_s3_files() any_changes = process_and_upload_tables(db, existing_files) - + if not any_changes: logger.info("No changes detected in the database.") return { - 'statusCode': 200, - 'body': json.dumps('No changes detected, no CSV files were uploaded.') + "statusCode": 200, + "body": json.dumps("No changes detected, no CSV files were uploaded."), } else: return { - 'statusCode': 200, - 'body': json.dumps('CSV files processed and uploaded successfully.') + "statusCode": 200, + "body": json.dumps("CSV files processed and uploaded successfully."), } except Exception as e: - logger.error(f'Error: {e}') - return { - 'statusCode': 500, - 'body': json.dumps('Internal server error.') - } + logger.error(f"Error: {e}") + return {"statusCode": 500, "body": json.dumps("Internal server error.")} finally: if db: db.close() -def retrieve_secrets(sm_client=boto3.client('secretsmanager'), secret_name='bentley-secrets'): +def retrieve_secrets( + sm_client=boto3.client("secretsmanager"), secret_name="bentley-secrets" +): try: response = sm_client.get_secret_value(SecretId=secret_name) - if 'SecretString' in response: - secret = json.loads(response['SecretString']) + if "SecretString" in response: + secret = json.loads(response["SecretString"]) return secret except ClientError as e: - logger.error(f'Could not retrieve secrets: {e}') + logger.error(f"Could not retrieve secrets: {e}") raise e + def connect_to_database() -> Connection: try: secrets = retrieve_secrets() @@ -72,87 +73,98 @@ def connect_to_database() -> Connection: database = secrets["database"] return Connection( - database=database, - user=user, - password=password, - host=host, - port=port + database=database, user=user, password=password, host=host, port=port ) except InterfaceError as i: - logger.error(f'Interface error: {i}') + logger.error(f"Interface error: {i}") raise DBConnectionException("Failed to connect to database") -def extract_bucket(client=boto3.client('s3')): + +def extract_bucket(client=boto3.client("s3")): response = client.list_buckets() - extract_bucket_filter = [bucket['Name'] for bucket in response['Buckets'] if 'extract' in bucket['Name']] + extract_bucket_filter = [ + bucket["Name"] for bucket in response["Buckets"] if "extract" in bucket["Name"] + ] return extract_bucket_filter[0] -def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client('s3')): - """Creates a dictionary and populates it with the - results of listing the contents of the s3 bucket, then - returns the populated dictionary + +def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3")): + """Creates a dictionary and populates it with the + results of listing the contents of the s3 bucket, then + returns the populated dictionary """ - + existing_files = {} - + try: response = client.list_objects_v2(Bucket=bucket_name) - - if 'Contents' in response: - for obj in response['Contents']: - s3_key = obj['Key'] + + if "Contents" in response: + for obj in response["Contents"]: + s3_key = obj["Key"] try: file_obj = client.get_object(Bucket=bucket_name, Key=s3_key) - file_content = file_obj['Body'].read().decode('utf-8') + file_content = file_obj["Body"].read().decode("utf-8") existing_files[s3_key] = file_content except ClientError as e: - logger.error(f'Error retrieving S3 object {s3_key}: {e}') + logger.error(f"Error retrieving S3 object {s3_key}: {e}") else: - logger.error('The bucket is empty') - + logger.error("The bucket is empty") + except ClientError as e: - logger.error(f'Error listing S3 objects: {e}') - - return existing_files + logger.error(f"Error listing S3 objects: {e}") + return existing_files -def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): - """Creates a list of the tables from a database query and - then selects everything from each table in individual queries - it then writes each table to CSV files and compares with the item - in the existing_files dictionary with the same name. If it finds any changes - to files, or new tables/files it uploads them to the s3 bucket +def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): + """Creates a list of the tables from a database query and + then selects everything from each table in individual queries + it then writes each table to CSV files and compares with the item + in the existing_files dictionary with the same name. If it finds any changes + to files, or new tables/files it uploads them to the s3 bucket """ - ## NEW CODE + # NEW CODE all_datetimes = [] for file_names in existing_files.keys(): - datetime_str_on_s3 = ''.join(re.search(r'\/(.+/).+_(.+)\.csv',file_names).group(1,2)) - all_datetimes.append(datetime.strptime(datetime_str_on_s3, '%Y/%m/%d/%H:%M:%S')) + datetime_str_on_s3 = "".join( + re.search(r"\/(.+/).+_(.+)\.csv", file_names).group(1, 2) + ) + all_datetimes.append(datetime.strptime(datetime_str_on_s3, "%Y/%m/%d/%H:%M:%S")) latest_timestamp = max(all_datetimes) - ## END OF NEW CODE + # END OF NEW CODE - tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';") + tables = db.run( + "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';" + ) print(tables) for table in tables: table_name = table[0] - rows = db.run(f"SELECT * FROM {table_name} WHERE last_updated >= {datetime.strftime(latest_timestamp,'%H-%m-%d %H:%M:%S')};") + rows = db.run( + f"SELECT * FROM {table_name} WHERE last_updated >= {datetime.strftime(latest_timestamp,'%H-%m-%d %H:%M:%S')};" + ) if rows: csv_file_path = f"/tmp/{table_name}.csv" - with open(csv_file_path, "w", newline='') as file: + with open(csv_file_path, "w", newline="") as file: writer = csv.writer(file) - #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] - column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")] + # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [ + col_name[0] + for col_name in db.run( + f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';" + ) + ] writer.writerow(column_names) writer.writerows(rows) - s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') + s3_key = datetime.strftime( + datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" + ) try: client.upload_file(csv_file_path, extract_bucket(), s3_key) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: - logger.error(f'Error uploading to S3: {e}') + logger.error(f"Error uploading to S3: {e}") else: logger.info(f"No new data.") - \ No newline at end of file -- cgit v1.2.3 From aba65e0db08625c1ef0d3db6076b54e56e0b45ea Mon Sep 17 00:00:00 2001 From: Ang Bel Date: Fri, 16 Aug 2024 14:20:39 +0100 Subject: refactor following github actions major risk message --- src/extract_lambda.py | 65 ++++++++++++++++++++++++--------------------------- 1 file changed, 31 insertions(+), 34 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 323d04a..cc09e87 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -30,8 +30,8 @@ def lambda_handler(event, context): db = connect_to_database() existing_files = list_existing_s3_files() any_changes = process_and_upload_tables(db, existing_files) - - if not any_changes: + + if not any_changes['updated']: logger.info("No changes detected in the database.") return { "statusCode": 200, @@ -39,8 +39,9 @@ def lambda_handler(event, context): } else: return { - "statusCode": 200, - "body": json.dumps("CSV files processed and uploaded successfully."), + 'statusCode': 200, + 'body': json.dumps(f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{ + 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}""") } except Exception as e: logger.error(f"Error: {e}") @@ -124,7 +125,8 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): in the existing_files dictionary with the same name. If it finds any changes to files, or new tables/files it uploads them to the s3 bucket """ - # NEW CODE + load_status = {'updated':[],'no change':[]} + ## Retrieving the latest file timestamp from S3 extract bucket all_datetimes = [] for file_names in existing_files.keys(): datetime_str_on_s3 = "".join( @@ -132,39 +134,34 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): ) all_datetimes.append(datetime.strptime(datetime_str_on_s3, "%Y/%m/%d/%H:%M:%S")) latest_timestamp = max(all_datetimes) - # END OF NEW CODE - tables = db.run( - "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';" - ) - print(tables) + ## Iterating through tables on the database and retrieving only latest changes vs previous file load + tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';") for table in tables: table_name = table[0] rows = db.run( f"SELECT * FROM {table_name} WHERE last_updated >= {datetime.strftime(latest_timestamp,'%H-%m-%d %H:%M:%S')};" ) - if rows: - csv_file_path = f"/tmp/{table_name}.csv" - with open(csv_file_path, "w", newline="") as file: - writer = csv.writer(file) - # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] - column_names = [ - col_name[0] - for col_name in db.run( - f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';" - ) - ] - writer.writerow(column_names) - writer.writerows(rows) - s3_key = datetime.strftime( - datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" - ) - - try: - client.upload_file(csv_file_path, extract_bucket(), s3_key) - logger.info(f"Uploaded {s3_key} to S3.") - except ClientError as e: - logger.error(f"Error uploading to S3: {e}") - else: - logger.info(f"No new data.") + ## Creating a temporary file path and writing the column name to it followed by each row of data + if rows: + csv_file_path = f"/tmp/{table_name}.csv" + with open(csv_file_path, "w", newline='') as file: + writer = csv.writer(file) + #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")] + writer.writerow(column_names) + writer.writerows(rows) + s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') + + ## Writing the new file to S3 extract bucket: + try: + client.upload_file(csv_file_path, extract_bucket(), s3_key) + load_status['updated'].append(table_name) + logger.info(f"Uploaded {s3_key} to S3.") + except ClientError as e: + logger.error(f'Error uploading to S3: {e}') + else: + load_status['no change'].append(table_name) + logger.info(f"No new data in {table_name} name. Latest data retrieved is from {latest_timestamp}.") + return load_status -- cgit v1.2.3 From 4428b8d9e8903e93ca2efd9f95cea9205bf303a9 Mon Sep 17 00:00:00 2001 From: Ang Bel Date: Fri, 16 Aug 2024 14:42:15 +0100 Subject: refactoring to be more in line with pythonic code practices and prevent sql injection --- src/extract_lambda.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index cc09e87..d1a5c7c 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -1,4 +1,4 @@ -from pg8000.native import Connection, InterfaceError +from pg8000.native import Connection, InterfaceError, identifier import boto3 import csv from botocore.exceptions import ClientError @@ -136,12 +136,15 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): latest_timestamp = max(all_datetimes) ## Iterating through tables on the database and retrieving only latest changes vs previous file load - tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';") + tables = db.run(""" + SELECT table_name + FROM information_schema.tables + WHERE table_schema='public' AND table_type='BASE TABLE';""") for table in tables: table_name = table[0] - rows = db.run( - f"SELECT * FROM {table_name} WHERE last_updated >= {datetime.strftime(latest_timestamp,'%H-%m-%d %H:%M:%S')};" - ) + rows = db.run(f"SELECT * FROM {identifier(table_name)} " + "WHERE last_updated >= :latest;", + latest={datetime.strftime(latest_timestamp,'%H-%m-%d %H:%M:%S')}) ## Creating a temporary file path and writing the column name to it followed by each row of data if rows: @@ -149,7 +152,9 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): with open(csv_file_path, "w", newline='') as file: writer = csv.writer(file) #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] - column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")] + column_names = [col_name[0] for col_name in + db.run("""SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = :table ;""", table=table_name)] writer.writerow(column_names) writer.writerows(rows) s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') -- cgit v1.2.3 From e153f2072eafca2c83a84e2c4210c46a40dabaf4 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Fri, 16 Aug 2024 14:36:15 +0000 Subject: style: format code with Autopep8, Black and Ruff Formatter This commit fixes the style issues introduced in 4428b8d according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/47 --- src/extract_lambda.py | 66 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index d1a5c7c..9a0e509 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -30,8 +30,8 @@ def lambda_handler(event, context): db = connect_to_database() existing_files = list_existing_s3_files() any_changes = process_and_upload_tables(db, existing_files) - - if not any_changes['updated']: + + if not any_changes["updated"]: logger.info("No changes detected in the database.") return { "statusCode": 200, @@ -39,9 +39,11 @@ def lambda_handler(event, context): } else: return { - 'statusCode': 200, - 'body': json.dumps(f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{ - 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}""") + "statusCode": 200, + "body": json.dumps( + f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{ + 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}""" + ), } except Exception as e: logger.error(f"Error: {e}") @@ -125,8 +127,8 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): in the existing_files dictionary with the same name. If it finds any changes to files, or new tables/files it uploads them to the s3 bucket """ - load_status = {'updated':[],'no change':[]} - ## Retrieving the latest file timestamp from S3 extract bucket + load_status = {"updated": [], "no change": []} + # Retrieving the latest file timestamp from S3 extract bucket all_datetimes = [] for file_names in existing_files.keys(): datetime_str_on_s3 = "".join( @@ -135,38 +137,50 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): all_datetimes.append(datetime.strptime(datetime_str_on_s3, "%Y/%m/%d/%H:%M:%S")) latest_timestamp = max(all_datetimes) - ## Iterating through tables on the database and retrieving only latest changes vs previous file load - tables = db.run(""" + # Iterating through tables on the database and retrieving only latest changes vs previous file load + tables = db.run( + """ SELECT table_name FROM information_schema.tables - WHERE table_schema='public' AND table_type='BASE TABLE';""") + WHERE table_schema='public' AND table_type='BASE TABLE';""" + ) for table in tables: table_name = table[0] - rows = db.run(f"SELECT * FROM {identifier(table_name)} " - "WHERE last_updated >= :latest;", - latest={datetime.strftime(latest_timestamp,'%H-%m-%d %H:%M:%S')}) + rows = db.run( + f"SELECT * FROM {identifier(table_name)} " "WHERE last_updated >= :latest;", + latest={datetime.strftime(latest_timestamp, "%H-%m-%d %H:%M:%S")}, + ) - ## Creating a temporary file path and writing the column name to it followed by each row of data + # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" - with open(csv_file_path, "w", newline='') as file: + with open(csv_file_path, "w", newline="") as file: writer = csv.writer(file) - #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] - column_names = [col_name[0] for col_name in - db.run("""SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS - WHERE table_name = :table ;""", table=table_name)] + # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [ + col_name[0] + for col_name in db.run( + """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = :table ;""", + table=table_name, + ) + ] writer.writerow(column_names) writer.writerows(rows) - s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') + s3_key = datetime.strftime( + datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" + ) - ## Writing the new file to S3 extract bucket: + # Writing the new file to S3 extract bucket: try: client.upload_file(csv_file_path, extract_bucket(), s3_key) - load_status['updated'].append(table_name) + load_status["updated"].append(table_name) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: - logger.error(f'Error uploading to S3: {e}') + logger.error(f"Error uploading to S3: {e}") else: - load_status['no change'].append(table_name) - logger.info(f"No new data in {table_name} name. Latest data retrieved is from {latest_timestamp}.") - return load_status + load_status["no change"].append(table_name) + logger.info( + f"No new data in {table_name} name. Latest data retrieved is from {latest_timestamp}." + ) + return load_status -- cgit v1.2.3 From 890ca0434ce5f7c9e7bdba1482a86cd63a4ef8f9 Mon Sep 17 00:00:00 2001 From: Ang Bel Date: Fri, 16 Aug 2024 15:45:03 +0100 Subject: dummy comment to test checks --- src/extract_lambda.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 9a0e509..30c7005 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -11,7 +11,7 @@ import re logger = logging.getLogger() logger.setLevel(logging.INFO) - +## DB Exception class class DBConnectionException(Exception): """Wraps pg8000.native Error or DatabaseError.""" -- cgit v1.2.3 From 653cb35e50b339356274ff03c0d75ac3babf927f Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Fri, 16 Aug 2024 14:45:16 +0000 Subject: style: format code with Autopep8, Black and Ruff Formatter This commit fixes the style issues introduced in 890ca04 according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/47 --- src/extract_lambda.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 30c7005..4168e27 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -11,7 +11,9 @@ import re logger = logging.getLogger() logger.setLevel(logging.INFO) -## DB Exception class +# DB Exception class + + class DBConnectionException(Exception): """Wraps pg8000.native Error or DatabaseError.""" -- cgit v1.2.3 From 43df5dd9c6bd21f33a7fccbc9b81ad3677637da5 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Mon, 19 Aug 2024 10:23:19 +0000 Subject: style: format code with Autopep8, Black and Ruff Formatter This commit fixes the style issues introduced in e27c6b4 according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/55 --- src/load_lambda.py | 2 +- src/secrets_manager.py | 31 +++++++++---------- src/transform_lambda.py | 2 +- test/test_secrets_manager.py | 19 +++++++----- tests/test_extract_lambda.py | 69 ++++++++++++++++++++++++------------------- tests/test_secrets_manager.py | 37 +++++++++++++++-------- 6 files changed, 93 insertions(+), 67 deletions(-) (limited to 'src') diff --git a/src/load_lambda.py b/src/load_lambda.py index 6ee681f..c6a8e60 100644 --- a/src/load_lambda.py +++ b/src/load_lambda.py @@ -1,2 +1,2 @@ def lambda_handler(): - pass \ No newline at end of file + pass diff --git a/src/secrets_manager.py b/src/secrets_manager.py index c0fb61e..3484688 100644 --- a/src/secrets_manager.py +++ b/src/secrets_manager.py @@ -4,45 +4,46 @@ import json def sm_client(): - sm_client = boto3.client('secretsmanager') + sm_client = boto3.client("secretsmanager") yield sm_client -def create_secret(sm_client, secret_name, cohort_id, user, password, host, database, port): + +def create_secret( + sm_client, secret_name, cohort_id, user, password, host, database, port +): secret = { "cohort_id": cohort_id, "user": user, "password": password, "host": host, "database": database, - "port": port + "port": port, } response = sm_client.create_secret( - Name = secret_name, - SecretString = json.dumps(secret) + Name=secret_name, SecretString=json.dumps(secret) ) print(response) return response + def list_secret(sm_client): response = sm_client.list_secrets() - secret_dict = response['SecretList'] + secret_dict = response["SecretList"] secret_names = [] for items in secret_dict: - secret_names.append(items['Name']) - print(f'{len(secret_names)} secret(s) available') + secret_names.append(items["Name"]) + print(f"{len(secret_names)} secret(s) available") for name in secret_names: print(name) return secret_names -def retrieve_secrets(sm_client): - response = sm_client.get_secrets( - - ) +def retrieve_secrets(sm_client): + response = sm_client.get_secrets() -#retrieve secret -#so lambda can access totesy db -#so lambda connect to the db and then retrieve the data \ No newline at end of file +# retrieve secret +# so lambda can access totesy db +# so lambda connect to the db and then retrieve the data diff --git a/src/transform_lambda.py b/src/transform_lambda.py index 6ee681f..c6a8e60 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -1,2 +1,2 @@ def lambda_handler(): - pass \ No newline at end of file + pass diff --git a/test/test_secrets_manager.py b/test/test_secrets_manager.py index 86533bc..cb4ec15 100644 --- a/test/test_secrets_manager.py +++ b/test/test_secrets_manager.py @@ -2,10 +2,12 @@ from src.secrets_manager import sm_client, create_secret, list_secret import boto3 from moto import mock_aws import json -import pytest +import pytest import os -pytest.fixture(scope='class') +pytest.fixture(scope="class") + + def mock_aws_credentials(): """Mocked AWS Credentials for moto.""" os.environ["AWS_ACCESS_KEY_ID"] = "testing" @@ -14,10 +16,11 @@ def mock_aws_credentials(): os.environ["AWS_SESSION_TOKEN"] = "testing" os.environ["AWS_DEFAULT_REGION"] = "eu-west-2" -@pytest.fixture(scope='class') + +@pytest.fixture(scope="class") def mock_sm_client(mock_aws_credentials): with mock_aws(): - yield boto3.client('secretsmanager') + yield boto3.client("secretsmanager") def test_create_secret_stores_secrets(mock_sm_client): @@ -29,6 +32,8 @@ def test_create_secret_stores_secrets(mock_sm_client): port = "test_port" secret_name = "test_secret" - response = create_secret(mock_sm_client, secret_name, cohort_id, user, password, host, database, port) - - assert response['Name'] == secret_name \ No newline at end of file + response = create_secret( + mock_sm_client, secret_name, cohort_id, user, password, host, database, port + ) + + assert response["Name"] == secret_name diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py index e94a8a4..877e36a 100644 --- a/tests/test_extract_lambda.py +++ b/tests/test_extract_lambda.py @@ -3,11 +3,17 @@ import boto3 from moto import mock_aws from unittest.mock import patch, MagicMock from unittest import TestCase -from src.extract_lambda import list_existing_s3_files, connect_to_database, DBConnectionException, process_and_upload_tables -import os +from src.extract_lambda import ( + list_existing_s3_files, + connect_to_database, + DBConnectionException, + process_and_upload_tables, +) +import os import logging -@pytest.fixture(scope='class') + +@pytest.fixture(scope="class") def mock_config(): env_vars = { "host": "abc", @@ -20,54 +26,55 @@ def mock_config(): yield mock_config -@pytest.fixture(scope='class') +@pytest.fixture(scope="class") def aws_credentials(): - os.environ["AWS_ACCESS_KEY_ID"] = 'testing' - os.environ["AWS_SECRET_ACCESS_KEY"] = 'testing' - os.environ["AWS_SECURIT_TOKEN"] = 'testing' - os.environ["AWS_SESSION_TOKEN"] = 'testing' - os.environ["AWS_DEFAULT_REGION"]= 'eu-west-2' + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" + os.environ["AWS_SECURIT_TOKEN"] = "testing" + os.environ["AWS_SESSION_TOKEN"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "eu-west-2" + -@pytest.fixture(scope='class') +@pytest.fixture(scope="class") def s3_client(aws_credentials): with mock_aws(): - yield boto3.client('s3') + yield boto3.client("s3") + class TestListExistingS3Files: def test_error_if_no_bucket(self, s3_client, caplog): - logger = logging.getLogger() - logger.info('Testing now.') + logger.info("Testing now.") caplog.set_level(logging.ERROR) list_existing_s3_files(client=s3_client) - assert 'Error listing S3 objects' in caplog.text + assert "Error listing S3 objects" in caplog.text def test_error_if_bucket_is_empty(self, s3_client, caplog): - - s3_client.create_bucket(Bucket='extract_bucket', - CreateBucketConfiguration={ - 'LocationConstraint': 'eu-west-2' - }) + s3_client.create_bucket( + Bucket="extract_bucket", + CreateBucketConfiguration={"LocationConstraint": "eu-west-2"}, + ) list_existing_s3_files(client=s3_client) - assert 'The bucket is empty' in caplog.text + assert "The bucket is empty" in caplog.text def test_error_retrieving_object(self, s3_client, caplog): - s3_client.upload_file('tests/dummy.txt', 'extract_bucket', 'dummy.txt') - list_existing_s3_files(bucket_name='test_bucket', client=s3_client) + s3_client.upload_file("tests/dummy.txt", "extract_bucket", "dummy.txt") + list_existing_s3_files(bucket_name="test_bucket", client=s3_client) - assert 'Error retrieving S3 object ' in caplog.text + assert "Error retrieving S3 object " in caplog.text def test_retrieves_file_content(self, s3_client, caplog): result = list_existing_s3_files(client=s3_client) - assert list(result.values()) == ['This is a test file.'] + assert list(result.values()) == ["This is a test file."] + class TestConnectToDatabase: def test_connect_to_database(mock_conn, mock_config): - with patch("src.extract_lambda.Connection", autospec=True) as mock_conn: + with patch("src.extract_lambda.Connection", autospec=True) as mock_conn: connect_to_database() mock_conn.assert_called_with( - host="abc", user="def", port="5432", password="password", database="db" + host="abc", user="def", port="5432", password="password", database="db" ) def test_database_error(self, mock_config): @@ -76,12 +83,14 @@ class TestConnectToDatabase: def test_logs_interface_error(self, caplog): logger = logging.getLogger() - logger.info('Testing now.') + logger.info("Testing now.") caplog.set_level(logging.ERROR) with pytest.raises(DBConnectionException): connect_to_database() - assert 'Interface error' in caplog.text -''' + assert "Interface error" in caplog.text + + +""" class TestProcessAndUploadTables: def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog): logger = logging.getLogger() @@ -106,4 +115,4 @@ class TestProcessAndUploadTables: s3_client.upload_file('tests/dummy_identical.csv', 'extract_bucket', s3_key) process_and_upload_tables(mock_db(), existing_files, client=s3_client) assert 'No new data.' in caplog.text -''' \ No newline at end of file +""" diff --git a/tests/test_secrets_manager.py b/tests/test_secrets_manager.py index a30be86..609c572 100644 --- a/tests/test_secrets_manager.py +++ b/tests/test_secrets_manager.py @@ -3,10 +3,11 @@ import boto3 import botocore.exceptions from moto import mock_aws import json -import pytest +import pytest import os -@pytest.fixture(scope='function') + +@pytest.fixture(scope="function") def aws_credentials(): """Mocked AWS Credentials for moto.""" os.environ["AWS_ACCESS_KEY_ID"] = "testing" @@ -15,12 +16,14 @@ def aws_credentials(): os.environ["AWS_SESSION_TOKEN"] = "testing" os.environ["AWS_DEFAULT_REGION"] = "eu-west-2" -@pytest.fixture(scope='function') + +@pytest.fixture(scope="function") def mock_sm_client(aws_credentials): with mock_aws(): yield boto3.client("secretsmanager") -@pytest.fixture(scope='function') + +@pytest.fixture(scope="function") def mock_store_secret(mock_sm_client): secret = { "cohort_id": "test_cohort_id", @@ -28,15 +31,18 @@ def mock_store_secret(mock_sm_client): "password": "test_password", "host": "test_host", "database": "test_database", - "port": "test_port" + "port": "test_port", } secret_name = "test_secret" - response = mock_sm_client.create_secret(Name=secret_name, SecretString=json.dumps(secret)) + response = mock_sm_client.create_secret( + Name=secret_name, SecretString=json.dumps(secret) + ) return response + def test_retrieves_secrets_returns_dictionary(mock_sm_client, mock_store_secret): secret_name = "test_secret" @@ -44,8 +50,10 @@ def test_retrieves_secrets_returns_dictionary(mock_sm_client, mock_store_secret) assert isinstance(result, dict) -def test_retrieves_secrets_returns_correct_keys_and_values(mock_sm_client, mock_store_secret): +def test_retrieves_secrets_returns_correct_keys_and_values( + mock_sm_client, mock_store_secret +): secret_name = "test_secret" result = retrieve_secrets(mock_sm_client, secret_name) @@ -57,17 +65,20 @@ def test_retrieves_secrets_returns_correct_keys_and_values(mock_sm_client, mock_ assert result["database"] == "test_database" assert result["port"] == "test_port" -def test_retrieves_secrets_raises_error_if_secret_name_incorrect_data_type(mock_sm_client): - secret_name = [1, 2, 3] +def test_retrieves_secrets_raises_error_if_secret_name_incorrect_data_type( + mock_sm_client, +): + secret_name = [1, 2, 3] with pytest.raises(botocore.exceptions.ParamValidationError) as error: retrieve_secrets(mock_sm_client, secret_name) -def test_retrieves_secrets_raises_error_if_secret_name_does_not_exist(mock_sm_client, mock_store_secret): - secret_name = 'test_secret_2' - +def test_retrieves_secrets_raises_error_if_secret_name_does_not_exist( + mock_sm_client, mock_store_secret +): + secret_name = "test_secret_2" with pytest.raises(botocore.exceptions.ClientError) as error: - retrieve_secrets(mock_sm_client, secret_name) \ No newline at end of file + retrieve_secrets(mock_sm_client, secret_name) -- cgit v1.2.3 From b9f3576771c8af8933d23e95f7863f63e2bbc6aa Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Mon, 19 Aug 2024 15:43:28 +0100 Subject: wip: fixed broken tests; hashed out test_error_retrieving_object --- src/extract_lambda.py | 1 + tests/test_extract_lambda.py | 49 ++++++++++++++++++++++++++------------------ 2 files changed, 30 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 4168e27..217efdb 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -90,6 +90,7 @@ def extract_bucket(client=boto3.client("s3")): extract_bucket_filter = [ bucket["Name"] for bucket in response["Buckets"] if "extract" in bucket["Name"] ] + return extract_bucket_filter[0] diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py index e94a8a4..665e419 100644 --- a/tests/test_extract_lambda.py +++ b/tests/test_extract_lambda.py @@ -1,11 +1,13 @@ +import boto3.exceptions +import botocore.exceptions import pytest import boto3 from moto import mock_aws from unittest.mock import patch, MagicMock from unittest import TestCase -from src.extract_lambda import list_existing_s3_files, connect_to_database, DBConnectionException, process_and_upload_tables -import os +from src.extract_lambda import list_existing_s3_files, connect_to_database, DBConnectionException, process_and_upload_tables, extract_bucket import logging +import os @pytest.fixture(scope='class') def mock_config(): @@ -16,7 +18,7 @@ def mock_config(): "password": "password", "database": "db", } - with patch("src.extract_lambda.get_config", return_value=env_vars) as mock_config: + with patch("src.extract_lambda.retrieve_secrets", return_value=env_vars) as mock_config: yield mock_config @@ -24,7 +26,7 @@ def mock_config(): def aws_credentials(): os.environ["AWS_ACCESS_KEY_ID"] = 'testing' os.environ["AWS_SECRET_ACCESS_KEY"] = 'testing' - os.environ["AWS_SECURIT_TOKEN"] = 'testing' + os.environ["AWS_SECURITY_TOKEN"] = 'testing' os.environ["AWS_SESSION_TOKEN"] = 'testing' os.environ["AWS_DEFAULT_REGION"]= 'eu-west-2' @@ -33,6 +35,14 @@ def s3_client(aws_credentials): with mock_aws(): yield boto3.client('s3') +@pytest.fixture(scope='class') +def s3_mock_bucket(s3_client): + bucket = s3_client.create_bucket(Bucket='extract_bucket', + CreateBucketConfiguration={ + 'LocationConstraint': 'eu-west-2' + }) + return bucket + class TestListExistingS3Files: def test_error_if_no_bucket(self, s3_client, caplog): @@ -42,35 +52,34 @@ class TestListExistingS3Files: list_existing_s3_files(client=s3_client) assert 'Error listing S3 objects' in caplog.text - def test_error_if_bucket_is_empty(self, s3_client, caplog): + def test_error_if_bucket_is_empty(self, s3_client, caplog, s3_mock_bucket): + list_existing_s3_files('extract_bucket', client=s3_client) + assert 'The bucket is empty' in caplog.text - s3_client.create_bucket(Bucket='extract_bucket', - CreateBucketConfiguration={ - 'LocationConstraint': 'eu-west-2' - }) - list_existing_s3_files(client=s3_client) - assert 'The bucket is empty' in caplog.text - def test_error_retrieving_object(self, s3_client, caplog): - s3_client.upload_file('tests/dummy.txt', 'extract_bucket', 'dummy.txt') - list_existing_s3_files(bucket_name='test_bucket', client=s3_client) + # def test_error_retrieving_object(self, s3_client, caplog, s3_mock_bucket): + # s3_client.upload_file('tests/dummy.txt', 'extract_bucket', 'dummy.txt') - assert 'Error retrieving S3 object ' in caplog.text + # list_existing_s3_files(bucket_name='extract_bucket', client=s3_client) - def test_retrieves_file_content(self, s3_client, caplog): - result = list_existing_s3_files(client=s3_client) + # assert 'Error retrieving S3 object dummy.txt: ClientError' in caplog.text + + + def test_retrieves_file_content(self, s3_client, caplog, s3_mock_bucket): + s3_client.upload_file('tests/dummy.txt', 'extract_bucket', 'dummy.txt') + result = list_existing_s3_files('extract_bucket', client=s3_client) - assert list(result.values()) == ['This is a test file.'] + assert list(result.values()) == ['This is a test file.'] class TestConnectToDatabase: - def test_connect_to_database(mock_conn, mock_config): + def test_connect_to_database(mock_conn, mock_config): ##had mock_config in param with patch("src.extract_lambda.Connection", autospec=True) as mock_conn: connect_to_database() mock_conn.assert_called_with( host="abc", user="def", port="5432", password="password", database="db" ) - def test_database_error(self, mock_config): + def test_database_error(self, mock_config): ##had mock_config in param with pytest.raises(DBConnectionException): connect_to_database() -- cgit v1.2.3 From c3c45c0d133ce32d48f1c72a0ac54f291038b1e7 Mon Sep 17 00:00:00 2001 From: Ellie Date: Mon, 19 Aug 2024 15:56:48 +0100 Subject: wip: fixing last test --- src/extract_lambda.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 4168e27..533bf82 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -147,12 +147,13 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): WHERE table_schema='public' AND table_type='BASE TABLE';""" ) for table in tables: + print(tables) table_name = table[0] rows = db.run( f"SELECT * FROM {identifier(table_name)} " "WHERE last_updated >= :latest;", latest={datetime.strftime(latest_timestamp, "%H-%m-%d %H:%M:%S")}, ) - + print('rows', rows) # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" @@ -183,6 +184,6 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): else: load_status["no change"].append(table_name) logger.info( - f"No new data in {table_name} name. Latest data retrieved is from {latest_timestamp}." + f"No new data" ) return load_status -- cgit v1.2.3 From 982b8fa318c9065bd9037d14c56abcd126252978 Mon Sep 17 00:00:00 2001 From: Ellie Date: Mon, 19 Aug 2024 16:33:26 +0100 Subject: add working process and upload tables test --- src/extract_lambda.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 533bf82..5a5a631 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -150,8 +150,8 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): print(tables) table_name = table[0] rows = db.run( - f"SELECT * FROM {identifier(table_name)} " "WHERE last_updated >= :latest;", - latest={datetime.strftime(latest_timestamp, "%H-%m-%d %H:%M:%S")}, + f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", + latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) print('rows', rows) # Creating a temporary file path and writing the column name to it followed by each row of data -- cgit v1.2.3 From a42d030fb663ad7eb040498cfc5f0627a27d6cc6 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Mon, 19 Aug 2024 16:11:44 +0000 Subject: style: format code with Autopep8, Black and Ruff Formatter This commit fixes the style issues introduced in 4f629e5 according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/65 --- src/extract_lambda.py | 8 +++----- tests/test_extract_lambda.py | 34 ++++++++++++++++++++-------------- 2 files changed, 23 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 5a5a631..9b17ef2 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -151,9 +151,9 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): table_name = table[0] rows = db.run( f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", - latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, + latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) - print('rows', rows) + print("rows", rows) # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" @@ -183,7 +183,5 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): logger.error(f"Error uploading to S3: {e}") else: load_status["no change"].append(table_name) - logger.info( - f"No new data" - ) + logger.info(f"No new data") return load_status diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py index 3405743..5a1c5b2 100644 --- a/tests/test_extract_lambda.py +++ b/tests/test_extract_lambda.py @@ -12,7 +12,7 @@ from src.extract_lambda import ( DBConnectionException, lambda_handler, process_and_upload_tables, - retrieve_secrets + retrieve_secrets, ) @@ -25,7 +25,9 @@ def mock_config(): "password": "password", "database": "db", } - with patch("src.extract_lambda.retrieve_secrets", return_value=env_vars) as mock_config: + with patch( + "src.extract_lambda.retrieve_secrets", return_value=env_vars + ) as mock_config: yield mock_config @@ -185,31 +187,35 @@ class TestProcessAndUploadTables: queries = [ "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';", "SELECT * FROM Fruits WHERE last_updated > :latest;", - "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = 'Fruits';" + "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = 'Fruits';", ] return_values = [ - [['Fruits']], + [["Fruits"]], [], # No new rows with a more recent last_updated timestamp - [['Food_type'], ['Flavour'], ['Colour'], ['last_updated']] + [["Food_type"], ["Flavour"], ["Colour"], ["last_updated"]], ] vals = dict(zip(queries, return_values)) # Patch the database connection and set return values for queries - with patch('src.extract_lambda.Connection') as mock_db: + with patch("src.extract_lambda.Connection") as mock_db: mock_db().run.side_effect = return_values - s3_key = 'Fruits/2024/08/15/Fruits_16:46:30.csv' + s3_key = "Fruits/2024/08/15/Fruits_16:46:30.csv" existing_files = { - s3_key: 'Food_type,Flavour,Colour,last_updated\nVegetable,Sour,Green,2022-11-03 14:20:49.962\nBerry,Sweet,Red,2022-11-03 14:20:49.962' + s3_key: "Food_type,Flavour,Colour,last_updated\nVegetable,Sour,Green,2022-11-03 14:20:49.962\nBerry,Sweet,Red,2022-11-03 14:20:49.962" } # Simulate S3 bucket and file setup - s3_client.create_bucket(Bucket='test_extract_bucket', - CreateBucketConfiguration={'LocationConstraint': 'eu-west-2'}) - s3_client.upload_file('tests/dummy_identical.csv', 'test_extract_bucket', s3_key) - + s3_client.create_bucket( + Bucket="test_extract_bucket", + CreateBucketConfiguration={"LocationConstraint": "eu-west-2"}, + ) + s3_client.upload_file( + "tests/dummy_identical.csv", "test_extract_bucket", s3_key + ) + # Run the process_and_upload_tables function process_and_upload_tables(mock_db(), existing_files, client=s3_client) # Assert that the log contains "No new data" - assert 'No new data' in caplog.text + assert "No new data" in caplog.text - # process and upload tables needs more tests \ No newline at end of file + # process and upload tables needs more tests -- cgit v1.2.3 From 88e71818aaf1bf67e4d2807d22d8122b7bf184f1 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 22:20:21 +0100 Subject: refactor(log): implement logging ancestry - avoid using root logger --- src/extract_lambda.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 15fe785..6f841b4 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -8,7 +8,7 @@ from datetime import datetime import re -logger = logging.getLogger() +logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # DB Exception class -- cgit v1.2.3 From 84b3dea3833ae65d53a1007567ee19c31bf34ee3 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 22:28:31 +0100 Subject: refactor(retrieve_secrets): use aws recommended method for retrieving secrets --- src/extract_lambda.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 6f841b4..1df4c34 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -55,18 +55,21 @@ def lambda_handler(event, context): db.close() -def retrieve_secrets( - sm_client=boto3.client("secretsmanager"), secret_name="bentley-secrets" -): +def retrieve_secrets(): + secret_name = "bentley-secrets" + region_name = "eu-west-2" + + # Create a Secrets Manager client + session = boto3.session.Session() + client = session.client(service_name="secretsmanager", region_name=region_name) + try: - response = sm_client.get_secret_value(SecretId=secret_name) - if "SecretString" in response: - secret = json.loads(response["SecretString"]) - return secret + get_secret_value_response = client.get_secret_value(SecretId=secret_name) except ClientError as e: - logger.error(f"Could not retrieve secrets: {e}") raise e + return get_secret_value_response["SecretString"] + def connect_to_database() -> Connection: try: -- cgit v1.2.3 From 3d4d74aa69db85e3c840b3b73c028f4e9f83d1f7 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 22:29:41 +0100 Subject: refactor(lambda_handler): remove unnecessary else statement --- src/extract_lambda.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 1df4c34..99117a4 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -39,14 +39,13 @@ def lambda_handler(event, context): "statusCode": 200, "body": json.dumps("No changes detected, no CSV files were uploaded."), } - else: - return { - "statusCode": 200, - "body": json.dumps( - f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{ - 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}""" - ), - } + return { + "statusCode": 200, + "body": json.dumps( + f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{ + 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}""" + ), + } except Exception as e: logger.error(f"Error: {e}") return {"statusCode": 500, "body": json.dumps("Internal server error.")} -- cgit v1.2.3 From 4699b3506307cb8556a7cc5f12fbe4df7a5c9a6b Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 22:31:58 +0100 Subject: refactor(retrieve_secrets): improve error handling when retrieving secrets --- src/extract_lambda.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 99117a4..63a80ce 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -66,6 +66,9 @@ def retrieve_secrets(): get_secret_value_response = client.get_secret_value(SecretId=secret_name) except ClientError as e: raise e + except KeyError: + logger.error(f"Secret {secret_name} does not contain a SecretString") + raise ValueError(f"Secret {secret_name} does not contain a SecretString") return get_secret_value_response["SecretString"] -- cgit v1.2.3 From 8353621c862e75d1573ff8338852aa7d54d5d2e8 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 22:36:37 +0100 Subject: refactor(retrieve_secrets): add logging for ClientError --- src/extract_lambda.py | 1 + 1 file changed, 1 insertion(+) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 63a80ce..485c021 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -65,6 +65,7 @@ def retrieve_secrets(): try: get_secret_value_response = client.get_secret_value(SecretId=secret_name) except ClientError as e: + logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}") raise e except KeyError: logger.error(f"Secret {secret_name} does not contain a SecretString") -- cgit v1.2.3 From bcbadd508dbc1a53864e64cb1e2eccce53daa187 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 22:37:43 +0100 Subject: chore: reorganise imports in extract_lambda --- src/extract_lambda.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 485c021..8353481 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -1,12 +1,12 @@ -from pg8000.native import Connection, InterfaceError, identifier -import boto3 import csv -from botocore.exceptions import ClientError -import logging import json -from datetime import datetime +import logging import re +from datetime import datetime +import boto3 +from botocore.exceptions import ClientError +from pg8000.native import Connection, InterfaceError, identifier logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -- cgit v1.2.3 From caed81dc699b9b4105da2b8924310f1a370217c7 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 23:13:39 +0100 Subject: refactor: add timestamp function in extract_lambda.py --- src/extract_lambda.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 8353481..ad3c970 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -129,6 +129,16 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 return existing_files +def get_latest_timestamp(existing_files): + all_datetimes = [] + for file_name in existing_files.keys(): + match = re.search(r"\/(.+/).+_(.+)\.csv", file_name) + if match: + datetime_str = "".join(match.group(1, 2)) + all_datetimes.append(datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")) + return max(all_datetimes) if all_datetimes else datetime.min + + def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """Creates a list of the tables from a database query and then selects everything from each table in individual queries @@ -137,22 +147,17 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): to files, or new tables/files it uploads them to the s3 bucket """ load_status = {"updated": [], "no change": []} - # Retrieving the latest file timestamp from S3 extract bucket - all_datetimes = [] - for file_names in existing_files.keys(): - datetime_str_on_s3 = "".join( - re.search(r"\/(.+/).+_(.+)\.csv", file_names).group(1, 2) - ) - all_datetimes.append(datetime.strptime(datetime_str_on_s3, "%Y/%m/%d/%H:%M:%S")) - latest_timestamp = max(all_datetimes) + latest_timestamp = get_latest_timestamp(existing_files) - # Iterating through tables on the database and retrieving only latest changes vs previous file load tables = db.run( """ - SELECT table_name - FROM information_schema.tables - WHERE table_schema='public' AND table_type='BASE TABLE';""" + SELECT table_name + FROM information_schema.tables + WHERE table_schema='public' + AND table_type='BASE TABLE'; + """ ) + for table in tables: print(tables) table_name = table[0] -- cgit v1.2.3 From 610d23e7ed0f39e5ecb0dd25c3a1e3cba20d662e Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 23:26:58 +0100 Subject: refactor: remove print statements in process_and_upload_tables --- src/extract_lambda.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index ad3c970..7c6c3d1 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -159,13 +159,11 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): ) for table in tables: - print(tables) table_name = table[0] rows = db.run( f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) - print("rows", rows) # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" -- cgit v1.2.3 From 5be3b130170c82360ff9715f5c09b9e815fc16f4 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 23:32:25 +0100 Subject: feat: use buffers for s3 upload instead of csv files --- src/extract_lambda.py | 50 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 7c6c3d1..f38e24a 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -3,6 +3,7 @@ import json import logging import re from datetime import datetime +from io import StringIO import boto3 from botocore.exceptions import ClientError @@ -139,6 +140,26 @@ def get_latest_timestamp(existing_files): return max(all_datetimes) if all_datetimes else datetime.min +def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key): + csv_buffer = StringIO() + csv_writer = csv.writer(csv_buffer) + + csv_writer.writerow(column_names) + + for row in rows: + csv_writer.writerow(row) + + if csv_buffer.tell() > 5 * 1024 * 1024: + csv_buffer.seek(0) + s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) + csv_buffer.truncate(0) + csv_buffer.seek(0) + + if csv_buffer.tell() > 0: + csv_buffer.seek(0) + s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) + + def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """Creates a list of the tables from a database query and then selects everything from each table in individual queries @@ -164,29 +185,24 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) - # Creating a temporary file path and writing the column name to it followed by each row of data if rows: - csv_file_path = f"/tmp/{table_name}.csv" - with open(csv_file_path, "w", newline="") as file: - writer = csv.writer(file) - # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] - column_names = [ - col_name[0] - for col_name in db.run( - """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS - WHERE table_name = :table ;""", - table=table_name, - ) - ] - writer.writerow(column_names) - writer.writerows(rows) + column_names = [ + col_name[0] + for col_name in db.run( + """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = :table ;""", + table=table_name, + ) + ] + s3_key = datetime.strftime( datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" ) - # Writing the new file to S3 extract bucket: try: - client.upload_file(csv_file_path, extract_bucket(), s3_key) + stream_to_s3( + table_name, rows, column_names, client, extract_bucket(), s3_key + ) load_status["updated"].append(table_name) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: -- cgit v1.2.3 From 3e80acb28eeeb0eaff97c2363124a8c6e95bcb13 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 23:44:52 +0100 Subject: refactor: optimise s3 streaming & file naming --- src/extract_lambda.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index f38e24a..8575b08 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -149,15 +149,9 @@ def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key) for row in rows: csv_writer.writerow(row) - if csv_buffer.tell() > 5 * 1024 * 1024: - csv_buffer.seek(0) - s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) - csv_buffer.truncate(0) - csv_buffer.seek(0) + csv_buffer.seek(0) - if csv_buffer.tell() > 0: - csv_buffer.seek(0) - s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) + s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): @@ -190,13 +184,14 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): col_name[0] for col_name in db.run( """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS - WHERE table_name = :table ;""", + WHERE table_name = :table ;""", table=table_name, ) ] - s3_key = datetime.strftime( - datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" + s3_key = ( + f"{table_name}/{datetime.now().strftime('%Y/%m/%d')}/" + f"{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv" ) try: -- cgit v1.2.3 From 640b0685cd795c03b571b3ca26fc9030b86c4f99 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 00:18:16 +0100 Subject: fix(extract_lambda): fix UnboundLocalError when db is called before it is assigned a value --- src/extract_lambda.py | 1 + 1 file changed, 1 insertion(+) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 8575b08..7efaac0 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -29,6 +29,7 @@ def lambda_handler(event, context): and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them it uses 3 helper functions to achieve these 3 functionalities """ + db = None try: db = connect_to_database() existing_files = list_existing_s3_files() -- cgit v1.2.3 From e25bee6c1c9db8edaf3197f0dc48fa3c63e61744 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:01:55 +0100 Subject: feat: revert s3 streaming to previous implementation for uploading --- src/extract_lambda.py | 56 +++++++++++++++++++++++---------------------------- 1 file changed, 25 insertions(+), 31 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 7efaac0..4921034 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -49,7 +49,7 @@ def lambda_handler(event, context): ), } except Exception as e: - logger.error(f"Error: {e}") + logger.error(f"Error: {e}", exc_info=True) return {"statusCode": 500, "body": json.dumps("Internal server error.")} finally: if db: @@ -78,7 +78,7 @@ def retrieve_secrets(): def connect_to_database() -> Connection: try: - secrets = retrieve_secrets() + secrets = json.loads(retrieve_secrets()) host = secrets["host"] port = secrets["port"] user = secrets["user"] @@ -141,20 +141,6 @@ def get_latest_timestamp(existing_files): return max(all_datetimes) if all_datetimes else datetime.min -def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key): - csv_buffer = StringIO() - csv_writer = csv.writer(csv_buffer) - - csv_writer.writerow(column_names) - - for row in rows: - csv_writer.writerow(row) - - csv_buffer.seek(0) - - s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) - - def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """Creates a list of the tables from a database query and then selects everything from each table in individual queries @@ -180,25 +166,29 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) + # Creating a temporary file path and writing the column name to it followed by each row of data if rows: - column_names = [ - col_name[0] - for col_name in db.run( - """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS - WHERE table_name = :table ;""", - table=table_name, - ) - ] - - s3_key = ( - f"{table_name}/{datetime.now().strftime('%Y/%m/%d')}/" - f"{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv" + csv_file_path = f"/tmp/{table_name}.csv" + with open(csv_file_path, "w", newline="") as file: + writer = csv.writer(file) + # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [ + col_name[0] + for col_name in db.run( + """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = :table ;""", + table=table_name, + ) + ] + writer.writerow(column_names) + writer.writerows(rows) + s3_key = datetime.strftime( + datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" ) + # Writing the new file to S3 extract bucket: try: - stream_to_s3( - table_name, rows, column_names, client, extract_bucket(), s3_key - ) + client.upload_file(csv_file_path, extract_bucket(), s3_key) load_status["updated"].append(table_name) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: @@ -207,3 +197,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): load_status["no change"].append(table_name) logger.info(f"No new data") return load_status + + +if __name__ == "__main__": + lambda_handler(None, None) -- cgit v1.2.3 From 5211751b69a894874945e3a916c33781a327ab10 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:26:26 +0100 Subject: feat: conditional logic for if bucket is empty --- src/extract_lambda.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 4921034..6216446 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -124,6 +124,7 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 logger.error(f"Error retrieving S3 object {s3_key}: {e}") else: logger.error("The bucket is empty") + return None except ClientError as e: logger.error(f"Error listing S3 objects: {e}") @@ -132,13 +133,18 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 def get_latest_timestamp(existing_files): - all_datetimes = [] - for file_name in existing_files.keys(): - match = re.search(r"\/(.+/).+_(.+)\.csv", file_name) - if match: - datetime_str = "".join(match.group(1, 2)) - all_datetimes.append(datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")) - return max(all_datetimes) if all_datetimes else datetime.min + if existing_files: + all_datetimes = [] + for file_name in existing_files.keys(): + match = re.search(r"\/(.+/).+_(.+)\.csv", file_name) + if match: + datetime_str = "".join(match.group(1, 2)) + all_datetimes.append( + datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S") + ) + return max(all_datetimes) if all_datetimes else datetime.min + + return existing_files def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): @@ -163,8 +169,16 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): for table in tables: table_name = table[0] rows = db.run( - f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", - latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, + f""" + SELECT * FROM {identifier(table_name)} + WHERE last_updated >= :latest; + """, + latest={ + datetime.strftime( + latest_timestamp if latest_timestamp else datetime(1990, 1, 1), + "%Y-%m-%d %H:%M:%S", + ) + }, ) # Creating a temporary file path and writing the column name to it followed by each row of data if rows: -- cgit v1.2.3 From dc3a7e74ddf549dad05745c64201aaf0d3402213 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:31:25 +0100 Subject: feat: add advanced logging --- src/extract_lambda.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 6216446..9daf662 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -10,8 +10,12 @@ from botocore.exceptions import ClientError from pg8000.native import Connection, InterfaceError, identifier logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - +logging.basicConfig( + format="{asctime} - {levelname} - {message}", + style="{", + datefmt="%Y-%m-%d %H:%M", + level=logging.INFO, +) # DB Exception class @@ -168,11 +172,13 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): for table in tables: table_name = table[0] - rows = db.run( - f""" + base_query = f""" SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest; - """, + """ + logger.info(f"Processing table: {table_name}") + rows = db.run( + base_query, latest={ datetime.strftime( latest_timestamp if latest_timestamp else datetime(1990, 1, 1), -- cgit v1.2.3 From 35397e8bad42a8c507d1fb13007c6da2f947e851 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:44:30 +0100 Subject: feat: add additional logging and exclude unnecessary table --- src/extract_lambda.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 9daf662..fe22192 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -165,7 +165,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """ SELECT table_name FROM information_schema.tables - WHERE table_schema='public' + WHERE table_schema='public' AND table_name != '_prisma_migrations' AND table_type='BASE TABLE'; """ ) @@ -176,16 +176,18 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest; """ - logger.info(f"Processing table: {table_name}") - rows = db.run( - base_query, - latest={ + latest = ( + { datetime.strftime( latest_timestamp if latest_timestamp else datetime(1990, 1, 1), "%Y-%m-%d %H:%M:%S", ) }, ) + logger.info(f"Processing table: {table_name}") + logger.info(f"Latest timestamp: {latest[0]}") + rows = db.run(base_query, latest=latest) + logger.info(f"Rows: {rows}") # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" -- cgit v1.2.3 From be911e22a964bdf7d5a4421cde7d7c6df447ed5c Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:49:59 +0100 Subject: refactor: change rows output to debug logger output --- src/extract_lambda.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index fe22192..e9f438b 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -16,7 +16,6 @@ logging.basicConfig( datefmt="%Y-%m-%d %H:%M", level=logging.INFO, ) -# DB Exception class class DBConnectionException(Exception): @@ -187,7 +186,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): logger.info(f"Processing table: {table_name}") logger.info(f"Latest timestamp: {latest[0]}") rows = db.run(base_query, latest=latest) - logger.info(f"Rows: {rows}") + logger.debug(f"Rows: {rows}") # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" -- cgit v1.2.3 From 2a914add8391f345ee1096b9deb729c05d3e06c3 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 15:15:02 +0100 Subject: feat: add more logging for debugging --- src/extract_lambda.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index e9f438b..24f0981 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -10,13 +10,16 @@ from botocore.exceptions import ClientError from pg8000.native import Connection, InterfaceError, identifier logger = logging.getLogger(__name__) + logging.basicConfig( format="{asctime} - {levelname} - {message}", style="{", datefmt="%Y-%m-%d %H:%M", - level=logging.INFO, + level=logging.DEBUG, ) +logging.getLogger("botocore").setLevel(logging.WARNING) + class DBConnectionException(Exception): """Wraps pg8000.native Error or DatabaseError.""" @@ -110,7 +113,7 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 results of listing the contents of the s3 bucket, then returns the populated dictionary """ - + logging.info("Listing existing S3 files") existing_files = {} try: -- cgit v1.2.3 From 53686e2e466bc38f65da15ec617b43e43a1af9f7 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 15:25:13 +0100 Subject: chore: tidy-up repository & remove unused files --- Makefile | 80 -------------------------------------------------- src/secrets_manager.py | 49 ------------------------------- test.py | 0 3 files changed, 129 deletions(-) delete mode 100644 Makefile delete mode 100644 src/secrets_manager.py delete mode 100644 test.py (limited to 'src') diff --git a/Makefile b/Makefile deleted file mode 100644 index 077cd98..0000000 --- a/Makefile +++ /dev/null @@ -1,80 +0,0 @@ -############################################## -# # -# MAKEFILE TO BUILD THE PROJECT # -# # -############################################## - -PROJECT_NAME = de-project-bentley -REGION = eu-west-2 -PYTHON_INTERPRETER = python -WD=$(shell pwd) -PYTHONPATH=${WD} -SHELL := /bin/bash -PROFILE = default -PIP:=pip - -## PYTHON INTERPRETER ENVIRONMENT -create-environment: - @echo ">>> About to create environment: $(PROJECT_NAME)..." - @echo ">>> check python3 version" - ( \ - $(PYTHON_INTERPRETER) --version; \ - ) - @echo ">>> Setting up VirtualEnv." - ( \ - $(PIP) install -q virtualenv virtualenvwrapper; \ - virtualenv venv --python=$(PYTHON_INTERPRETER); \ - ) - -ACTIVATE_ENV := source venv/bin/activate - -# Execute python related functionalities from within the project's environment -define execute_in_env - $(ACTIVATE_ENV) && $1 -endef - -## Build the environment requirements -requirements: create-environment - $(call execute_in_env, $(PIP) install -r ./requirements.txt) - -# Set Up -## Install bandit -bandit: - $(call execute_in_env, $(PIP) install bandit) - -## Install safety -safety: - $(call execute_in_env, $(PIP) install safety) - -## Install black -black: - $(call execute_in_env, $(PIP) install black) - -## Install coverage -coverage: - $(call execute_in_env, $(PIP) install coverage) - -## Set up dev requirements (bandit, safety, black) -dev-setup: bandit safety black coverage - -# Build / Run - -## Run the security test (bandit + safety) -security-test: - $(call execute_in_env, safety check -r ./requirements.txt) - $(call execute_in_env, bandit -lll */*.py *c/*/*.py) - -## Run the black code check -run-black: - $(call execute_in_env, black ./src/*/*.py ./test/*/*.py) - -## Run the unit tests -unit-test: - $(call execute_in_env, PYTHONPATH=${PYTHONPATH} pytest -v) - -## Run the coverage check -check-coverage: - $(call execute_in_env, PYTHONPATH=${PYTHONPATH} pytest --cov=src test/) - -## Run all checks -run-checks: security-test run-black unit-test check-coverage diff --git a/src/secrets_manager.py b/src/secrets_manager.py deleted file mode 100644 index 3484688..0000000 --- a/src/secrets_manager.py +++ /dev/null @@ -1,49 +0,0 @@ -import boto3 -from botocore.exceptions import ClientError -import json - - -def sm_client(): - sm_client = boto3.client("secretsmanager") - yield sm_client - - -def create_secret( - sm_client, secret_name, cohort_id, user, password, host, database, port -): - secret = { - "cohort_id": cohort_id, - "user": user, - "password": password, - "host": host, - "database": database, - "port": port, - } - - response = sm_client.create_secret( - Name=secret_name, SecretString=json.dumps(secret) - ) - - print(response) - return response - - -def list_secret(sm_client): - response = sm_client.list_secrets() - secret_dict = response["SecretList"] - secret_names = [] - for items in secret_dict: - secret_names.append(items["Name"]) - print(f"{len(secret_names)} secret(s) available") - for name in secret_names: - print(name) - return secret_names - - -def retrieve_secrets(sm_client): - response = sm_client.get_secrets() - - -# retrieve secret -# so lambda can access totesy db -# so lambda connect to the db and then retrieve the data diff --git a/test.py b/test.py deleted file mode 100644 index e69de29..0000000 -- cgit v1.2.3