diff options
| author | lian-manonog <160282780+lian-manonog@users.noreply.github.com> | 2024-08-15 13:55:44 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-08-15 13:55:44 +0100 |
| commit | a009ffe72a2005e72e67345f728539e500b899f5 (patch) | |
| tree | 1bdebb2046a9b1356faa2fe902d9187601ecb3f7 | |
| parent | fe548561acc5e133e3bee4026aab85db2e511bcd (diff) | |
| parent | 848a86b7f3b9c5ce16cd774d19e3fa62ca8ffc68 (diff) | |
| download | de-project-bentley-a009ffe72a2005e72e67345f728539e500b899f5.tar.gz de-project-bentley-a009ffe72a2005e72e67345f728539e500b899f5.zip | |
Merge branch 'feature-extract-lambda-data-extraction' into tf-secrets-manager
| -rw-r--r-- | .gitignore | 5 | ||||
| -rw-r--r-- | src/extract_lambda.py | 153 | ||||
| -rw-r--r-- | tests/dummy.txt | 1 | ||||
| -rw-r--r-- | tests/test_extract_lambda.py | 112 |
4 files changed, 249 insertions, 22 deletions
@@ -8,4 +8,7 @@ .terraform* log* .DS_Store -venv
\ No newline at end of file + +venv +.env +__pycache__/ diff --git a/src/extract_lambda.py b/src/extract_lambda.py index faa1d30..56b47a6 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -1,33 +1,144 @@ -from pg8000.native import Connection, Error, DatabaseError, InterfaceError -from dotenv import load_dotenv -import os +from pg8000.native import Connection, DatabaseError, InterfaceError +from dotenv import dotenv_values +import boto3 +import csv +from botocore.exceptions import ClientError +import logging +import json +from datetime import datetime -load_dotenv() +logger = logging.getLogger() +logger.setLevel(logging.INFO) -def extract(): -# temporary credentials for dev- will not have access when uploaded +class DBConnectionException(Exception): + """Wraps pg8000.native Error or DatabaseError.""" - database = os.getenv('database') - user = os.getenv('user') - password = os.getenv('password') - host = os.getenv('host') - port = os.getenv('port') + def __init__(self, e): + """Initialise with provided error message.""" + self.message = str(e) + super().__init__(self.message) + +def lambda_handler(event, context): + """This lambda function connects to the Totesys database, lists the contents of the ingestion bucket, + and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them + it uses 3 helper functions to achieve these 3 functionalities + """ + try: + db = connect_to_database() + existing_files = list_existing_s3_files() + any_changes = process_and_upload_tables(db, existing_files) + + if not any_changes: + logger.info("No changes detected in the database.") + return { + 'statusCode': 200, + 'body': json.dumps('No changes detected, no CSV files were uploaded.') + } + else: + return { + 'statusCode': 200, + 'body': json.dumps('CSV files processed and uploaded successfully.') + } + + except Exception as e: + logger.error(f'Error: {e}') + return { + 'statusCode': 500, + 'body': json.dumps('Internal server error.') + } + + finally: + + if db: + db.close() + +def get_config(path: str = ".env") -> dict: + return dotenv_values(path) +def connect_to_database() -> Connection: try: - db = Connection.run( - database=database, - user=user, - password=password, - host=host, - port=port + config = get_config() + host = config["host"] + port = config["port"] + user = config["user"] + password = config["password"] + database = config["database"] + + return Connection( + database=database, + user=user, + password=password, + host=host, + port=port ) - except DatabaseError as e: - print(e) except InterfaceError as i: - print(i) + logger.error(f'Interface error: {i}') + raise DBConnectionException("Failed to connect to database") + + + +def list_existing_s3_files(bucket_name='extract_bucket', client=boto3.client('s3')): + """Creates a dictionary and populates it with the + results of listing the contents of the s3 bucket, then + returns the populated dictionary + """ + + existing_files = {} + + try: + response = client.list_objects_v2(Bucket='extract_bucket') + + if 'Contents' in response: + for obj in response['Contents']: + s3_key = obj['Key'] + try: + file_obj = client.get_object(Bucket=bucket_name, Key=s3_key) + file_content = file_obj['Body'].read().decode('utf-8') + existing_files[s3_key] = file_content + except ClientError as e: + logger.error(f'Error retrieving S3 object {s3_key}: {e}') + else: + logger.error('The bucket is empty') + + except ClientError as e: + logger.error(f'Error listing S3 objects: {e}') + + return existing_files + + + +def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): + """Creates a list of the tables from a database query and + then selects everything from each table in individual queries + it then writes each table to CSV files and compares with the item + in the existing_files dictionary with the same name. If it finds any changes + to files, or new tables/files it uploads them to the s3 bucket + """ + + tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';") + + for table in tables: + table_name = table[0] + rows = db.run(f"SELECT * FROM {table_name};") + + csv_file_path = f"/tmp/{table_name}.csv" + with open(csv_file_path, "w", newline='') as file: + writer = csv.writer(file) + column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + writer.writerow(column_names) + writer.writerows(rows) + + s3_key = f"{table_name}/{datetime.today().year}/{datetime.today().month}/{datetime.today().day}/{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv" + new_csv_content = open(csv_file_path, "r").read() + -
\ No newline at end of file + if s3_key not in existing_files or existing_files[s3_key] != new_csv_content: + try: + client.upload_file(csv_file_path, 'extract_bucket', s3_key) + logger.info(f"Uploaded {s3_key} to S3.") + except ClientError as e: + logger.error(f'Error uploading to S3: {e}')
\ No newline at end of file diff --git a/tests/dummy.txt b/tests/dummy.txt new file mode 100644 index 0000000..af27ff4 --- /dev/null +++ b/tests/dummy.txt @@ -0,0 +1 @@ +This is a test file.
\ No newline at end of file diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py new file mode 100644 index 0000000..74d7e2c --- /dev/null +++ b/tests/test_extract_lambda.py @@ -0,0 +1,112 @@ +import pytest +import boto3 +from moto import mock_aws +from unittest.mock import patch +from unittest import TestCase +from src.extract_lambda import list_existing_s3_files, connect_to_database, DBConnectionException, process_and_upload_tables +import os +import logging + +@pytest.fixture(scope='class') +def mock_config(): + env_vars = { + "host": "abc", + "port": "5432", + "user": "def", + "password": "password", + "database": "db", + } + with patch("src.extract_lambda.get_config", return_value=env_vars) as mock_config: + yield mock_config + + +@pytest.fixture(scope='class') +def aws_credentials(): + os.environ["AWS_ACCESS_KEY_ID"] = 'testing' + os.environ["AWS_SECRET_ACCESS_KEY"] = 'testing' + os.environ["AWS_SECURIT_TOKEN"] = 'testing' + os.environ["AWS_SESSION_TOKEN"] = 'testing' + os.environ["AWS_DEFAULT_REGION"]= 'eu-west-2' + +@pytest.fixture(scope='class') +def s3_client(aws_credentials): + with mock_aws(): + yield boto3.client('s3') + +class TestListExistingS3Files: + def test_error_if_no_bucket(self, s3_client, caplog): + + logger = logging.getLogger() + logger.info('Testing now.') + caplog.set_level(logging.ERROR) + list_existing_s3_files(client=s3_client) + assert 'Error listing S3 objects' in caplog.text + + def test_error_if_bucket_is_empty(self, s3_client, caplog): + + s3_client.create_bucket(Bucket='extract_bucket', + CreateBucketConfiguration={ + 'LocationConstraint': 'eu-west-2' + }) + list_existing_s3_files(client=s3_client) + assert 'The bucket is empty' in caplog.text + + def test_error_retrieving_object(self, s3_client, caplog): + s3_client.upload_file('tests/dummy.txt', 'extract_bucket', 'dummy.txt') + list_existing_s3_files(bucket_name='test_bucket', client=s3_client) + + assert 'Error retrieving S3 object ' in caplog.text + + def test_retrieves_file_content(self, s3_client, caplog): + result = list_existing_s3_files(client=s3_client) + + assert list(result.values()) == ['This is a test file.'] + +class TestConnectToDatabase: + def test_connect_to_database(mock_conn, mock_config): + with patch("src.extract_lambda.Connection", autospec=True) as mock_conn: + connect_to_database() + mock_conn.assert_called_with( + host="abc", user="def", port="5432", password="password", database="db" + ) + + def test_database_error(self, mock_config): + with pytest.raises(DBConnectionException): + connect_to_database() + + def test_logs_interface_error(self, caplog): + logger = logging.getLogger() + logger.info('Testing now.') + caplog.set_level(logging.ERROR) + with pytest.raises(DBConnectionException): + connect_to_database() + assert 'Interface error' in caplog.text + +class TestProcessAndUploadTables: + def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog, mocker): + logger = logging.getLogger() + logger.info('Testing now.') + caplog.set_level(logging.ERROR) + + with patch("src.extract_lambda.Connection", autospec=True) as mock_conn: + mock_db = connect_to_database() + # need to add a table + s3_key = 'dummy/2024/8/14/dummy_16:46:30.txt' + mock_existing_files = mocker.Mock(return_value={s3_key: 'This is a test file.' }) + s3_client.create_bucket(Bucket='extract_bucket', + CreateBucketConfiguration={ + 'LocationConstraint': 'eu-west-2' + }) + s3_client.upload_file('tests/dummy.txt', 'extract_bucket', s3_key) + process_and_upload_tables(mock_db, mock_existing_files, client=s3_client) + + assert 'Error uploading to S3' in caplog.text + +#@pytest.mark.describe("Helpers") +# @pytest.mark.it("Query processor returns correctly formatted dict") +# def test_process_query(): +# with patch("src.api.helpers.get_db_connection") as mock_conn: +# mock_conn().run.side_effect = db_data +# mock_conn().columns = sample_headers +# result = process_query("test query") +# assert result == sample_result
\ No newline at end of file |
