diff options
| -rw-r--r-- | .gitignore | 3 | ||||
| -rw-r--r-- | README.md | 8 | ||||
| -rwxr-xr-x | scripts/make_layer_zip.sh | 8 | ||||
| -rw-r--r-- | src/load_lambda.py | 2 | ||||
| -rw-r--r-- | src/secrets_manager.py | 31 | ||||
| -rw-r--r-- | src/transform_lambda.py | 2 | ||||
| -rw-r--r-- | terraform/iam.tf | 17 | ||||
| -rw-r--r-- | terraform/lambda.tf | 73 | ||||
| -rw-r--r-- | tests/test_extract_lambda.py | 127 | ||||
| -rw-r--r-- | tests/test_secrets_manager.py | 37 |
10 files changed, 198 insertions, 110 deletions
@@ -14,5 +14,4 @@ __pycache__/ # OS-Related Files .DS_Store - -*venv* +venv
\ No newline at end of file @@ -1,5 +1,13 @@ # ToteSys - Data Engineering Project +[](https://www.python.org/) +[](https://aws.amazon.com/) +[](https://www.terraform.io/) +[](https://www.postgresql.org/) +[](https://github.com/features/actions) + +[](https://github.com/ajschofield/de-project-bentley/actions/workflows/deploy.yml?query=branch%3Amain) +[](https://github.com/ajschofield/de-project-bentley/deployments/production) # Summary The project aims to implement a data platform that can extract data from an operational database, archive it in a data lake, and make it easily accessible diff --git a/scripts/make_layer_zip.sh b/scripts/make_layer_zip.sh new file mode 100755 index 0000000..eabe301 --- /dev/null +++ b/scripts/make_layer_zip.sh @@ -0,0 +1,8 @@ +# Description: Make the zip file for the layer + +cd "$(dirname "$0")/.." +mkdir -p python/lib/python3.11/site-packages +pip3 install --upgrade -r requirements.txt -t python/lib/python3.11/site-packages +rm layer.zip +zip -r layer.zip python +rm -r python/ diff --git a/src/load_lambda.py b/src/load_lambda.py index 6ee681f..c6a8e60 100644 --- a/src/load_lambda.py +++ b/src/load_lambda.py @@ -1,2 +1,2 @@ def lambda_handler(): - pass
\ No newline at end of file + pass diff --git a/src/secrets_manager.py b/src/secrets_manager.py index c0fb61e..3484688 100644 --- a/src/secrets_manager.py +++ b/src/secrets_manager.py @@ -4,45 +4,46 @@ import json def sm_client(): - sm_client = boto3.client('secretsmanager') + sm_client = boto3.client("secretsmanager") yield sm_client -def create_secret(sm_client, secret_name, cohort_id, user, password, host, database, port): + +def create_secret( + sm_client, secret_name, cohort_id, user, password, host, database, port +): secret = { "cohort_id": cohort_id, "user": user, "password": password, "host": host, "database": database, - "port": port + "port": port, } response = sm_client.create_secret( - Name = secret_name, - SecretString = json.dumps(secret) + Name=secret_name, SecretString=json.dumps(secret) ) print(response) return response + def list_secret(sm_client): response = sm_client.list_secrets() - secret_dict = response['SecretList'] + secret_dict = response["SecretList"] secret_names = [] for items in secret_dict: - secret_names.append(items['Name']) - print(f'{len(secret_names)} secret(s) available') + secret_names.append(items["Name"]) + print(f"{len(secret_names)} secret(s) available") for name in secret_names: print(name) return secret_names -def retrieve_secrets(sm_client): - response = sm_client.get_secrets( - - ) +def retrieve_secrets(sm_client): + response = sm_client.get_secrets() -#retrieve secret -#so lambda can access totesy db -#so lambda connect to the db and then retrieve the data
\ No newline at end of file +# retrieve secret +# so lambda can access totesy db +# so lambda connect to the db and then retrieve the data diff --git a/src/transform_lambda.py b/src/transform_lambda.py index 6ee681f..c6a8e60 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -1,2 +1,2 @@ def lambda_handler(): - pass
\ No newline at end of file + pass diff --git a/terraform/iam.tf b/terraform/iam.tf index 0e5fa6d..7585ff8 100644 --- a/terraform/iam.tf +++ b/terraform/iam.tf @@ -28,17 +28,19 @@ resource "aws_iam_role" "multi_service_role" { ######################################################################## # S3 SETUP # Description: allows allows retention/tagging/access control settings -# Lambda IAM Policy for S3 Write +# Lambda IAM Policy for S3 ######################################################################## # S3 DEFINE POLICY data "aws_iam_policy_document" "s3_data_policy_doc" { statement { + effect = "Allow" actions = [ "s3:PutObject", "s3:PutObjectRetention", "s3:PutObjectTagging", - "s3:PutObjectAcl" + "s3:PutObjectAcl", + "s3:ListObjects" ] resources = [ "${aws_s3_bucket.extract_bucket.arn}/*", @@ -46,6 +48,17 @@ data "aws_iam_policy_document" "s3_data_policy_doc" { "${aws_s3_bucket.lambda_code_bucket.arn}/*", ] } + + statement { + effect = "Allow" + actions = [ + "s3:ListBuckets", + "s3:ListAllMyBuckets" + ] + resources = [ + "arn:aws:s3:::*", + ] + } } diff --git a/terraform/lambda.tf b/terraform/lambda.tf index 67fd6eb..72aae04 100644 --- a/terraform/lambda.tf +++ b/terraform/lambda.tf @@ -12,12 +12,14 @@ resource "aws_s3_object" "extract_lambda_code" { } resource "aws_lambda_function" "extract_lambda" { - function_name = var.extract_lambda_name - s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket - s3_key = aws_s3_object.extract_lambda_code.key - role = aws_iam_role.multi_service_role.arn - handler = "extract_lambda.extract" - runtime = "python3.11" + function_name = var.extract_lambda_name + s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket + s3_key = aws_s3_object.extract_lambda_code.key + layers = [aws_lambda_layer_version.lambda_layer.arn] + role = aws_iam_role.multi_service_role.arn + handler = "extract_lambda.lambda_handler" + runtime = "python3.11" + source_code_hash = data.archive_file.extract_lambda_zip.output_base64sha256 lifecycle { create_before_destroy = true @@ -40,12 +42,14 @@ resource "aws_s3_object" "transform_lambda_code" { } resource "aws_lambda_function" "transform_lambda" { - function_name = var.transform_lambda_name - s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket - s3_key = aws_s3_object.transform_lambda_code.key - role = aws_iam_role.multi_service_role.arn - handler = "transform_lambda.transform" - runtime = "python3.11" + function_name = var.transform_lambda_name + s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket + s3_key = aws_s3_object.transform_lambda_code.key + layers = [aws_lambda_layer_version.lambda_layer.arn] + role = aws_iam_role.multi_service_role.arn + handler = "transform_lambda.lambda_handler" + runtime = "python3.11" + source_code_hash = data.archive_file.transform_lambda_zip.output_base64sha256 lifecycle { create_before_destroy = true @@ -68,12 +72,14 @@ resource "aws_s3_object" "load_lambda_code" { } resource "aws_lambda_function" "load_lambda" { - function_name = var.load_lambda_name - s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket - s3_key = aws_s3_object.load_lambda_code.key - role = aws_iam_role.multi_service_role.arn - handler = "load_lambda.load" - runtime = "python3.11" + function_name = var.load_lambda_name + s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket + s3_key = aws_s3_object.load_lambda_code.key + layers = [aws_lambda_layer_version.lambda_layer.arn] + role = aws_iam_role.multi_service_role.arn + handler = "load_lambda.lambda_handler" + runtime = "python3.11" + source_code_hash = data.archive_file.load_lambda_zip.output_base64sha256 lifecycle { create_before_destroy = true @@ -82,37 +88,32 @@ resource "aws_lambda_function" "load_lambda" { depends_on = [aws_s3_object.load_lambda_code] } +# Lambda Layer Specification locals { - layer_dir = "${path.module}/.." - requirements = "${path.module}/../requirements.txt" - layer_zip = "${path.module}/../layer.zip" + layer_dir = "../" + layer_zip = "layer.zip" + layer_name = "lambda_layer" + script_dir = "../scripts" } resource "null_resource" "prepare_layer" { - triggers = { - requirements_hash = filesha1(local.requirements) - } provisioner "local-exec" { - command = <<EOT - mkdir -p ${local.layer_dir}/python/lib/python3.11/site-packages/ - pip install -r ${local.requirements} -t ${local.layer_dir}/python/lib/python3.11/site-packages/ - cd ${local.layer_dir} && zip -r ${local.layer_zip} . - EOT + command = "bash ${local.script_dir}/make_layer_zip.sh" } } -resource "aws_s3_object" "layer_zip" { - bucket = aws_s3_bucket.lambda_code_bucket.bucket - key = "layer.zip" - source = local.layer_zip +resource "aws_s3_object" "lambda_layer_zip" { + bucket = aws_s3_bucket.lambda_code_bucket.id #bucket instead of id + key = "${local.layer_name}/${local.layer_zip}" + source = "${local.layer_dir}/${local.layer_zip}" depends_on = [null_resource.prepare_layer] } resource "aws_lambda_layer_version" "lambda_layer" { - layer_name = "lambda_layer" + layer_name = local.layer_name compatible_runtimes = ["python3.11"] s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket - s3_key = aws_s3_object.layer_zip.key + s3_key = aws_s3_object.lambda_layer_zip.key skip_destroy = true - depends_on = [aws_s3_object.layer_zip] + depends_on = [aws_s3_object.lambda_layer_zip] } diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py index 02e3d3c..b1894cc 100644 --- a/tests/test_extract_lambda.py +++ b/tests/test_extract_lambda.py @@ -5,15 +5,16 @@ import boto3 from moto import mock_aws from unittest.mock import patch, MagicMock from unittest import TestCase +import os +import logging +import json from src.extract_lambda import ( list_existing_s3_files, connect_to_database, DBConnectionException, + lambda_handler, process_and_upload_tables, - extract_bucket, ) -import logging -import os @pytest.fixture(scope="class") @@ -54,6 +55,88 @@ def s3_mock_bucket(s3_client): ) return bucket +class TestLambdaHandler: + def test_lambda_handler_files_processed_and_uploaded_successfully(self, mocker): + mock_db = MagicMock() + mock_db.run.side_effect = [ + [["Fruits"]], + [["Vegetable", "Sour", "Green"], ["Berry", "Sweet", "Red"]], + [["Food_type"], ["Flavour"], ["Colour"]], + ] + mock_db.columns.return_value = [ + {"name": "Food_type"}, + {"name": "Flavour"}, + {"name": "Colour"}, + ] + with patch("src.extract_lambda.connect_to_database", return_value=mock_db): + mock_process_and_upload_tables = mocker.patch( + "src.extract_lambda.process_and_upload_tables", return_value=mock_db + ) + mock_list_existing_s3_files = mocker.patch( + "src.extract_lambda.list_existing_s3_files", return_value={} + ) + event = {} + context = {} + response = lambda_handler(event, context) + assert response["statusCode"] == 200 + assert ( + json.loads(response["body"]) + == "CSV files processed and uploaded successfully." + ) + mock_list_existing_s3_files.assert_called_once() + mock_process_and_upload_tables.assert_called_once_with(mock_db, {}) + mock_db.close.assert_called_once() + + def test_lambda_handler_no_changes_detected_no_files_uploaded(self, mocker): + mock_db = MagicMock() + mock_db.run.side_effect = [ + [["Fruits"]], + [["Vegetable", "Sour", "Green"], ["Berry", "Sweet", "Red"]], + [["Food_type"], ["Flavour"], ["Colour"]], + ] + mock_db.columns.return_value = [ + {"name": "Food_type"}, + {"name": "Flavour"}, + {"name": "Colour"}, + ] + + with patch("src.extract_lambda.connect_to_database", return_value=mock_db): + mock_process_and_upload_tables = mocker.patch( + "src.extract_lambda.process_and_upload_tables", return_value=False + ) + mock_list_existing_s3_files = mocker.patch( + "src.extract_lambda.list_existing_s3_files", return_value={} + ) + event = {} + context = {} + response = lambda_handler(event, context) + assert response["statusCode"] == 200 + assert ( + json.loads(response["body"]) + == "No changes detected, no CSV files were uploaded." + ) + mock_list_existing_s3_files.assert_called_once() + mock_process_and_upload_tables.assert_called_once_with(mock_db, {}) + mock_db.close.assert_called_once() + + def test_lambda_handler_exception_error(self, mocker): + with patch( + "src.extract_lambda.connect_to_database", + side_effect=Exception("Database connection error"), + ): + mock_process_and_upload_tables = mocker.patch( + "src.extract_lambda.process_and_upload_tables" + ) + mock_list_existing_s3_files = mocker.patch( + "src.extract_lambda.list_existing_s3_files" + ) + event = {} + context = {} + response = lambda_handler(event, context) + assert response["statusCode"] == 500 + assert json.loads(response["body"]) == "Internal server error." + mock_list_existing_s3_files.assert_not_called() + mock_process_and_upload_tables.assert_not_called() class TestListExistingS3Files: def test_error_if_no_bucket(self, s3_client, caplog): @@ -67,17 +150,9 @@ class TestListExistingS3Files: list_existing_s3_files("extract_bucket", client=s3_client) assert "The bucket is empty" in caplog.text - # def test_error_retrieving_object(self, s3_client, caplog, s3_mock_bucket): - # s3_client.upload_file('tests/dummy.txt', 'extract_bucket', 'dummy.txt') - - # list_existing_s3_files(bucket_name='extract_bucket', client=s3_client) - - # assert 'Error retrieving S3 object dummy.txt: ClientError' in caplog.text - def test_retrieves_file_content(self, s3_client, caplog, s3_mock_bucket): s3_client.upload_file("tests/dummy.txt", "extract_bucket", "dummy.txt") result = list_existing_s3_files("extract_bucket", client=s3_client) - assert list(result.values()) == ["This is a test file."] @@ -100,32 +175,4 @@ class TestConnectToDatabase: caplog.set_level(logging.ERROR) with pytest.raises(DBConnectionException): connect_to_database() - assert "Interface error" in caplog.text - - -""" -class TestProcessAndUploadTables: - def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog): - logger = logging.getLogger() - logger.info('Testing now.') - caplog.set_level(logging.ERROR) - #### - queries = ["SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';", - "SELECT * FROM Fruits;", - "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = 'Fruits'"] - return_values = [[['Fruits']], - [['Vegetable','Sour','Green'],['Berry','Sweet','Red']], - [['Food_type'],['Flavour'],['Colour']]] - vals = dict(zip(queries,return_values)) - - #### - with patch('src.extract_lambda.connect_to_database') as mock_db: - mock_db().run.side_effects = return_values - s3_key = 'Fruits/2024/08/15/Fruits_16:46:30.csv' - existing_files = {s3_key: 'Food_type,Flavour,Colour\nFruit,Sour,Green\nBerry,Sweet,Red'} - s3_client.create_bucket(Bucket='extract_bucket', - CreateBucketConfiguration={'LocationConstraint': 'eu-west-2'}) - s3_client.upload_file('tests/dummy_identical.csv', 'extract_bucket', s3_key) - process_and_upload_tables(mock_db(), existing_files, client=s3_client) - assert 'No new data.' in caplog.text -""" + assert "Interface error" in caplog.text
\ No newline at end of file diff --git a/tests/test_secrets_manager.py b/tests/test_secrets_manager.py index a30be86..609c572 100644 --- a/tests/test_secrets_manager.py +++ b/tests/test_secrets_manager.py @@ -3,10 +3,11 @@ import boto3 import botocore.exceptions from moto import mock_aws import json -import pytest +import pytest import os -@pytest.fixture(scope='function') + +@pytest.fixture(scope="function") def aws_credentials(): """Mocked AWS Credentials for moto.""" os.environ["AWS_ACCESS_KEY_ID"] = "testing" @@ -15,12 +16,14 @@ def aws_credentials(): os.environ["AWS_SESSION_TOKEN"] = "testing" os.environ["AWS_DEFAULT_REGION"] = "eu-west-2" -@pytest.fixture(scope='function') + +@pytest.fixture(scope="function") def mock_sm_client(aws_credentials): with mock_aws(): yield boto3.client("secretsmanager") -@pytest.fixture(scope='function') + +@pytest.fixture(scope="function") def mock_store_secret(mock_sm_client): secret = { "cohort_id": "test_cohort_id", @@ -28,15 +31,18 @@ def mock_store_secret(mock_sm_client): "password": "test_password", "host": "test_host", "database": "test_database", - "port": "test_port" + "port": "test_port", } secret_name = "test_secret" - response = mock_sm_client.create_secret(Name=secret_name, SecretString=json.dumps(secret)) + response = mock_sm_client.create_secret( + Name=secret_name, SecretString=json.dumps(secret) + ) return response + def test_retrieves_secrets_returns_dictionary(mock_sm_client, mock_store_secret): secret_name = "test_secret" @@ -44,8 +50,10 @@ def test_retrieves_secrets_returns_dictionary(mock_sm_client, mock_store_secret) assert isinstance(result, dict) -def test_retrieves_secrets_returns_correct_keys_and_values(mock_sm_client, mock_store_secret): +def test_retrieves_secrets_returns_correct_keys_and_values( + mock_sm_client, mock_store_secret +): secret_name = "test_secret" result = retrieve_secrets(mock_sm_client, secret_name) @@ -57,17 +65,20 @@ def test_retrieves_secrets_returns_correct_keys_and_values(mock_sm_client, mock_ assert result["database"] == "test_database" assert result["port"] == "test_port" -def test_retrieves_secrets_raises_error_if_secret_name_incorrect_data_type(mock_sm_client): - secret_name = [1, 2, 3] +def test_retrieves_secrets_raises_error_if_secret_name_incorrect_data_type( + mock_sm_client, +): + secret_name = [1, 2, 3] with pytest.raises(botocore.exceptions.ParamValidationError) as error: retrieve_secrets(mock_sm_client, secret_name) -def test_retrieves_secrets_raises_error_if_secret_name_does_not_exist(mock_sm_client, mock_store_secret): - secret_name = 'test_secret_2' - +def test_retrieves_secrets_raises_error_if_secret_name_does_not_exist( + mock_sm_client, mock_store_secret +): + secret_name = "test_secret_2" with pytest.raises(botocore.exceptions.ClientError) as error: - retrieve_secrets(mock_sm_client, secret_name)
\ No newline at end of file + retrieve_secrets(mock_sm_client, secret_name) |
