aboutsummaryrefslogtreecommitdiffstats
path: root/tests/test_extract_lambda.py
diff options
context:
space:
mode:
authorAlex <git@ajschof.me>2024-08-19 11:52:31 +0100
committerGitHub <noreply@github.com>2024-08-19 11:52:31 +0100
commit8091f94af52518e8a586479c5fc35051f61b493c (patch)
tree61bd3b1e00340e85117e71d4f0714423603ccaeb /tests/test_extract_lambda.py
parent71e967e7ccb0a7e486cba1555f182cec03f4cbe4 (diff)
parent24a4573d6cf64ec0383ae16bfba09a0ffdb8c129 (diff)
downloadde-project-bentley-8091f94af52518e8a586479c5fc35051f61b493c.tar.gz
de-project-bentley-8091f94af52518e8a586479c5fc35051f61b493c.zip
Merge pull request #57 from ajschofield/test-extract-v2
pr: update extract_lambda tests
Diffstat (limited to 'tests/test_extract_lambda.py')
-rw-r--r--tests/test_extract_lambda.py155
1 files changed, 125 insertions, 30 deletions
diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py
index e94a8a4..67cb6d3 100644
--- a/tests/test_extract_lambda.py
+++ b/tests/test_extract_lambda.py
@@ -3,11 +3,19 @@ import boto3
from moto import mock_aws
from unittest.mock import patch, MagicMock
from unittest import TestCase
-from src.extract_lambda import list_existing_s3_files, connect_to_database, DBConnectionException, process_and_upload_tables
-import os
+from src.extract_lambda import (
+ list_existing_s3_files,
+ connect_to_database,
+ DBConnectionException,
+ lambda_handler,
+ process_and_upload_tables,
+)
+import os
import logging
+import json
-@pytest.fixture(scope='class')
+
+@pytest.fixture(scope="class")
def mock_config():
env_vars = {
"host": "abc",
@@ -20,54 +28,139 @@ def mock_config():
yield mock_config
-@pytest.fixture(scope='class')
+@pytest.fixture(scope="class")
def aws_credentials():
- os.environ["AWS_ACCESS_KEY_ID"] = 'testing'
- os.environ["AWS_SECRET_ACCESS_KEY"] = 'testing'
- os.environ["AWS_SECURIT_TOKEN"] = 'testing'
- os.environ["AWS_SESSION_TOKEN"] = 'testing'
- os.environ["AWS_DEFAULT_REGION"]= 'eu-west-2'
+ os.environ["AWS_ACCESS_KEY_ID"] = "testing"
+ os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+ os.environ["AWS_SECURIT_TOKEN"] = "testing"
+ os.environ["AWS_SESSION_TOKEN"] = "testing"
+ os.environ["AWS_DEFAULT_REGION"] = "eu-west-2"
+
-@pytest.fixture(scope='class')
+@pytest.fixture(scope="class")
def s3_client(aws_credentials):
with mock_aws():
- yield boto3.client('s3')
+ yield boto3.client("s3")
+
+
+class TestLambdaHandler:
+ def test_lambda_handler_files_processed_and_uploaded_successfully(self, mocker):
+ mock_db = MagicMock()
+ mock_db.run.side_effect = [
+ [["Fruits"]],
+ [["Vegetable", "Sour", "Green"], ["Berry", "Sweet", "Red"]],
+ [["Food_type"], ["Flavour"], ["Colour"]],
+ ]
+ mock_db.columns.return_value = [
+ {"name": "Food_type"},
+ {"name": "Flavour"},
+ {"name": "Colour"},
+ ]
+ with patch("src.extract_lambda.connect_to_database", return_value=mock_db):
+ mock_process_and_upload_tables = mocker.patch(
+ "src.extract_lambda.process_and_upload_tables", return_value=mock_db
+ )
+ mock_list_existing_s3_files = mocker.patch(
+ "src.extract_lambda.list_existing_s3_files", return_value={}
+ )
+ event = {}
+ context = {}
+ response = lambda_handler(event, context)
+ assert response["statusCode"] == 200
+ assert (
+ json.loads(response["body"])
+ == "CSV files processed and uploaded successfully."
+ )
+ mock_list_existing_s3_files.assert_called_once()
+ mock_process_and_upload_tables.assert_called_once_with(mock_db, {})
+ mock_db.close.assert_called_once()
+
+ def test_lambda_handler_no_changes_detected_no_files_uploaded(self, mocker):
+ mock_db = MagicMock()
+ mock_db.run.side_effect = [
+ [["Fruits"]],
+ [["Vegetable", "Sour", "Green"], ["Berry", "Sweet", "Red"]],
+ [["Food_type"], ["Flavour"], ["Colour"]],
+ ]
+ mock_db.columns.return_value = [
+ {"name": "Food_type"},
+ {"name": "Flavour"},
+ {"name": "Colour"},
+ ]
+
+ with patch("src.extract_lambda.connect_to_database", return_value=mock_db):
+ mock_process_and_upload_tables = mocker.patch(
+ "src.extract_lambda.process_and_upload_tables", return_value=False
+ )
+ mock_list_existing_s3_files = mocker.patch(
+ "src.extract_lambda.list_existing_s3_files", return_value={}
+ )
+ event = {}
+ context = {}
+ response = lambda_handler(event, context)
+ assert response["statusCode"] == 200
+ assert (
+ json.loads(response["body"])
+ == "No changes detected, no CSV files were uploaded."
+ )
+ mock_list_existing_s3_files.assert_called_once()
+ mock_process_and_upload_tables.assert_called_once_with(mock_db, {})
+ mock_db.close.assert_called_once()
+
+ def test_lambda_handler_exception_error(self, mocker):
+ with patch(
+ "src.extract_lambda.connect_to_database",
+ side_effect=Exception("Database connection error"),
+ ):
+ mock_process_and_upload_tables = mocker.patch(
+ "src.extract_lambda.process_and_upload_tables"
+ )
+ mock_list_existing_s3_files = mocker.patch(
+ "src.extract_lambda.list_existing_s3_files"
+ )
+ event = {}
+ context = {}
+ response = lambda_handler(event, context)
+ assert response["statusCode"] == 500
+ assert json.loads(response["body"]) == "Internal server error."
+ mock_list_existing_s3_files.assert_not_called()
+ mock_process_and_upload_tables.assert_not_called()
+
class TestListExistingS3Files:
def test_error_if_no_bucket(self, s3_client, caplog):
-
logger = logging.getLogger()
- logger.info('Testing now.')
+ logger.info("Testing now.")
caplog.set_level(logging.ERROR)
list_existing_s3_files(client=s3_client)
- assert 'Error listing S3 objects' in caplog.text
+ assert "Error listing S3 objects" in caplog.text
def test_error_if_bucket_is_empty(self, s3_client, caplog):
-
- s3_client.create_bucket(Bucket='extract_bucket',
- CreateBucketConfiguration={
- 'LocationConstraint': 'eu-west-2'
- })
+ s3_client.create_bucket(
+ Bucket="extract_bucket",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
list_existing_s3_files(client=s3_client)
- assert 'The bucket is empty' in caplog.text
+ assert "The bucket is empty" in caplog.text
def test_error_retrieving_object(self, s3_client, caplog):
- s3_client.upload_file('tests/dummy.txt', 'extract_bucket', 'dummy.txt')
- list_existing_s3_files(bucket_name='test_bucket', client=s3_client)
+ s3_client.upload_file("tests/dummy.txt", "extract_bucket", "dummy.txt")
+ list_existing_s3_files(bucket_name="test_bucket", client=s3_client)
- assert 'Error retrieving S3 object ' in caplog.text
+ assert "Error retrieving S3 object " in caplog.text
def test_retrieves_file_content(self, s3_client, caplog):
result = list_existing_s3_files(client=s3_client)
- assert list(result.values()) == ['This is a test file.']
+ assert list(result.values()) == ["This is a test file."]
+
class TestConnectToDatabase:
def test_connect_to_database(mock_conn, mock_config):
- with patch("src.extract_lambda.Connection", autospec=True) as mock_conn:
+ with patch("src.extract_lambda.Connection", autospec=True) as mock_conn:
connect_to_database()
mock_conn.assert_called_with(
- host="abc", user="def", port="5432", password="password", database="db"
+ host="abc", user="def", port="5432", password="password", database="db"
)
def test_database_error(self, mock_config):
@@ -76,12 +169,14 @@ class TestConnectToDatabase:
def test_logs_interface_error(self, caplog):
logger = logging.getLogger()
- logger.info('Testing now.')
+ logger.info("Testing now.")
caplog.set_level(logging.ERROR)
with pytest.raises(DBConnectionException):
connect_to_database()
- assert 'Interface error' in caplog.text
-'''
+ assert "Interface error" in caplog.text
+
+
+"""
class TestProcessAndUploadTables:
def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog):
logger = logging.getLogger()
@@ -106,4 +201,4 @@ class TestProcessAndUploadTables:
s3_client.upload_file('tests/dummy_identical.csv', 'extract_bucket', s3_key)
process_and_upload_tables(mock_db(), existing_files, client=s3_client)
assert 'No new data.' in caplog.text
-''' \ No newline at end of file
+"""
git.ajschof.me — hosted by ajschofield — powered by cgit