aboutsummaryrefslogtreecommitdiffstats
path: root/tests/test_extract_lambda.py
blob: 877e36ae6a69c26964a81ebc679a29c52264eb05 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import pytest
import boto3
from moto import mock_aws
from unittest.mock import patch, MagicMock
from unittest import TestCase
from src.extract_lambda import (
    list_existing_s3_files,
    connect_to_database,
    DBConnectionException,
    process_and_upload_tables,
)
import os
import logging


@pytest.fixture(scope="class")
def mock_config():
    env_vars = {
        "host": "abc",
        "port": "5432",
        "user": "def",
        "password": "password",
        "database": "db",
    }
    with patch("src.extract_lambda.get_config", return_value=env_vars) as mock_config:
        yield mock_config


@pytest.fixture(scope="class")
def aws_credentials():
    os.environ["AWS_ACCESS_KEY_ID"] = "testing"
    os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
    os.environ["AWS_SECURIT_TOKEN"] = "testing"
    os.environ["AWS_SESSION_TOKEN"] = "testing"
    os.environ["AWS_DEFAULT_REGION"] = "eu-west-2"


@pytest.fixture(scope="class")
def s3_client(aws_credentials):
    with mock_aws():
        yield boto3.client("s3")


class TestListExistingS3Files:
    def test_error_if_no_bucket(self, s3_client, caplog):
        logger = logging.getLogger()
        logger.info("Testing now.")
        caplog.set_level(logging.ERROR)
        list_existing_s3_files(client=s3_client)
        assert "Error listing S3 objects" in caplog.text

    def test_error_if_bucket_is_empty(self, s3_client, caplog):
        s3_client.create_bucket(
            Bucket="extract_bucket",
            CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
        )
        list_existing_s3_files(client=s3_client)
        assert "The bucket is empty" in caplog.text

    def test_error_retrieving_object(self, s3_client, caplog):
        s3_client.upload_file("tests/dummy.txt", "extract_bucket", "dummy.txt")
        list_existing_s3_files(bucket_name="test_bucket", client=s3_client)

        assert "Error retrieving S3 object " in caplog.text

    def test_retrieves_file_content(self, s3_client, caplog):
        result = list_existing_s3_files(client=s3_client)

        assert list(result.values()) == ["This is a test file."]


class TestConnectToDatabase:
    def test_connect_to_database(mock_conn, mock_config):
        with patch("src.extract_lambda.Connection", autospec=True) as mock_conn:
            connect_to_database()
            mock_conn.assert_called_with(
                host="abc", user="def", port="5432", password="password", database="db"
            )

    def test_database_error(self, mock_config):
        with pytest.raises(DBConnectionException):
            connect_to_database()

    def test_logs_interface_error(self, caplog):
        logger = logging.getLogger()
        logger.info("Testing now.")
        caplog.set_level(logging.ERROR)
        with pytest.raises(DBConnectionException):
            connect_to_database()
        assert "Interface error" in caplog.text


"""
class TestProcessAndUploadTables:
    def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog):
        logger = logging.getLogger()
        logger.info('Testing now.')
        caplog.set_level(logging.ERROR)
        ####
        queries = ["SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';",
                    "SELECT * FROM Fruits;",
                    "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = 'Fruits'"]
        return_values = [[['Fruits']],
                        [['Vegetable','Sour','Green'],['Berry','Sweet','Red']],
                        [['Food_type'],['Flavour'],['Colour']]]
        vals = dict(zip(queries,return_values))

        ####
        with patch('src.extract_lambda.connect_to_database') as mock_db:
            mock_db().run.side_effects = return_values
            s3_key = 'Fruits/2024/08/15/Fruits_16:46:30.csv'
            existing_files = {s3_key: 'Food_type,Flavour,Colour\nFruit,Sour,Green\nBerry,Sweet,Red'}
            s3_client.create_bucket(Bucket='extract_bucket', 
                        CreateBucketConfiguration={'LocationConstraint': 'eu-west-2'})
            s3_client.upload_file('tests/dummy_identical.csv', 'extract_bucket', s3_key)
            process_and_upload_tables(mock_db(), existing_files, client=s3_client)
            assert 'No new data.' in caplog.text
"""
git.ajschof.me — hosted by ajschofield — powered by cgit