aboutsummaryrefslogtreecommitdiffstats
path: root/tests/test_load_lambda.py
blob: 3e42c2ad28bdf3a1f6c4a1c981e022a59767ed5e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import pandas as pd
import pyarrow.parquet as pq
from io import BytesIO
from moto import mock_aws
import boto3
import os
import pytest
from src.load_lambda import lambda_handler, connect_to_db_and_return_engine, get_transform_bucket, convert_parquet_files_to_dfs, upload_dfs_to_database

@pytest.fixture(scope="class")
def aws_credentials():
    os.environ["AWS_ACCESS_KEY_ID"] = "testing"
    os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
    os.environ["AWS_SECURIT_TOKEN"] = "testing"
    os.environ["AWS_SESSION_TOKEN"] = "testing"
    os.environ["AWS_DEFAULT_REGION"] = "eu-west-2"


@pytest.fixture(scope="class")
def mock_s3_client(aws_credentials):
    with mock_aws():
        yield boto3.client("s3")


class TestLambdaHandler:
    pass

class TestRetrieveSecrets:
    pass

class TestConnectToDBAndReturnEngine:
    pass

class TestGetTransformBucket:
    def test_raises_value_error_if_no_buckets(self, mock_s3_client):
        with pytest.raises(ValueError, match="No transform bucket found"):
            get_transform_bucket(mock_s3_client)

    def test_raises_value_error_if_no_transform_bucket(self, mock_s3_client):
        mock_s3_client.create_bucket(
        Bucket="extract_bucket",
        CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
    )
        with pytest.raises(ValueError, match="No transform bucket found"):
            get_transform_bucket(mock_s3_client)

    def test_returns_transform_bucket_if_one_bucket(self, mock_s3_client):
        mock_s3_client.create_bucket(
        Bucket="transform_bucket",
        CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
    )
        result = get_transform_bucket(mock_s3_client)
        assert result == "transform_bucket"

    def test_only_returns_transform_bucket_if_several_buckets(self, mock_s3_client):
        mock_s3_client.create_bucket(
        Bucket="another_test_bucket",
        CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
    )
        result = get_transform_bucket(mock_s3_client)
        assert result == "transform_bucket"

class TestConvertParquetToDfs:
    def test_function_returns_empty_dictionary_if_no_files(self, mock_s3_client):
        mock_s3_client.create_bucket(
        Bucket="transform_bucket",
        CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
    )
        result = convert_parquet_files_to_dfs(bucket_name="transform_bucket", client=mock_s3_client)
        assert result == {}

    def test_function_returns_dictionary_with_table_with_file_key():
        # need to mock parquet file and upload to mock bucket
        result = convert_parquet_files_to_dfs(bucket_name="transform_bucket", client=mock_s3_client)
        assert "dim_staff" in result

class TestUploadDfsToDatabase:
    pass
git.ajschof.me — hosted by ajschofield — powered by cgit