aboutsummaryrefslogtreecommitdiffstats
path: root/tests/test_transform_lambda.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_transform_lambda.py')
-rw-r--r--tests/test_transform_lambda.py191
1 files changed, 191 insertions, 0 deletions
diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py
new file mode 100644
index 0000000..35d7e3c
--- /dev/null
+++ b/tests/test_transform_lambda.py
@@ -0,0 +1,191 @@
+from src.transform_lambda.transform_lambda import (
+ read_from_s3_subfolder_to_df,
+ list_existing_s3_files,
+ bucket_name,
+ process_to_parquet_and_upload_to_s3,
+)
+from moto import mock_aws
+import pytest
+import pandas as pd
+import os
+import boto3
+from botocore.exceptions import ClientError
+import numpy as np
+
+# import caplog
+import logging
+
+
+logger = logging.getLogger()
+logger.setLevel(logging.INFO)
+
+
+@pytest.fixture(scope="class")
+def aws_credentials():
+ os.environ["AWS_ACCESS_KEY_ID"] = "testing"
+ os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+ os.environ["AWS_SECURIT_TOKEN"] = "testing"
+ os.environ["AWS_SESSION_TOKEN"] = "testing"
+ os.environ["AWS_DEFAULT_REGION"] = "eu-west-2"
+
+
+@pytest.fixture(scope="class")
+def s3_client(aws_credentials):
+ with mock_aws():
+ yield boto3.client("s3")
+
+
+@pytest.fixture(scope="class")
+def mock_extract_bucket(s3_client):
+ mock_extract_bucket = s3_client.create_bucket(
+ Bucket="dummy_extract_buc",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
+ return mock_extract_bucket
+
+
+@pytest.fixture(scope="class")
+def mock_transform_bucket(s3_client):
+ mock_transform_bucket = s3_client.create_bucket(
+ Bucket="dummy_transform_buc",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
+ return mock_transform_bucket
+
+
+class TestReadFromS3:
+ # @pytest.mark.skip(reason="The test is broken!")
+ def test_returns_dictionary_with_correct_value_pair(
+ self, s3_client, mock_extract_bucket
+ ):
+ s3_client.upload_file(
+ "tests/dummy_identical.csv",
+ "dummy_extract_buc",
+ "Foods/2024/08/21/Foods_12:03:10.csv",
+ )
+ tables = ["Foods"]
+ result = read_from_s3_subfolder_to_df(
+ tables, bucket="dummy_extract_buc", client=s3_client
+ )
+ print(result)
+ expected_df = pd.DataFrame(
+ np.array(
+ [
+ ["Vegetable", "Sour", "Green", "2022-11-03 14:20:49.962"],
+ ["Berry", "Sweet", "Red", "2022-11-03 14:20:49.962"],
+ ]
+ ),
+ columns=["Food_type", "Flavour", "Colour", "last_updated"],
+ )
+ assert isinstance(result, dict)
+ assert list(result.keys())[0] == "Foods"
+ assert isinstance(result["Foods"], pd.DataFrame)
+ assert result["Foods"].eq(expected_df, axis="columns").all(axis=None)
+
+ # @pytest.mark.skip(reason="The test is broken!")
+ def test_returns_dictionary_of_dataframes_for_multiple_tables(
+ self, s3_client, mock_extract_bucket
+ ):
+ s3_client.upload_file(
+ "tests/dummy_2.csv",
+ "dummy_extract_buc",
+ "Cars/2024/08/21/Cars_14:03:56.csv",
+ )
+ tables = ["Foods", "Cars"]
+ result = read_from_s3_subfolder_to_df(
+ tables, bucket="dummy_extract_buc", client=s3_client
+ )
+ expected_foods_df = pd.DataFrame(
+ np.array(
+ [
+ ["Vegetable", "Sour", "Green", "2022-11-03 14:20:49.962"],
+ ["Berry", "Sweet", "Red", "2022-11-03 14:20:49.962"],
+ ]
+ ),
+ columns=["Food_type", "Flavour", "Colour", "last_updated"],
+ )
+ expected_cars_df = pd.DataFrame(
+ np.array(
+ [
+ ["Truck", "Chevrolet", "Grey"],
+ ["Convertible", "Mercedes", "Red"],
+ ["Van", "Volkswagen", "Blue"],
+ ]
+ ),
+ columns=["Car_type", "Brand", "Colour"],
+ )
+ assert list(result.keys()) == tables
+ assert result["Foods"].eq(expected_foods_df, axis="columns").all(axis=None)
+ # assert result["Cars"].eq(expected_cars_df, axis="columns").all(axis=None)
+
+
+class TestListExistingFiles:
+ def test_functions_receives_error_if_no_bucket(self, s3_client, caplog):
+ caplog.set_level(logging.INFO)
+
+ with pytest.raises(ClientError):
+ list_existing_s3_files("rando_bucket", client=s3_client)
+
+ assert (
+ "Error listing S3 objects: An error occurred (NoSuchBucket) when calling the ListObjectsV2 operation: The specified bucket does not exist"
+ in caplog.text
+ )
+
+ def test_recieves_logger_error_if_no_files_listed(self, s3_client, caplog):
+ caplog.set_level(logging.INFO)
+
+ s3_client.create_bucket(
+ Bucket="mock_bucket",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
+ response = list_existing_s3_files("mock_bucket", client=s3_client)
+ assert "The bucket is empty" in caplog.text
+
+ def test_retrieves_existing_files(self, s3_client, caplog):
+ caplog.set_level(logging.INFO)
+
+ s3_client.upload_file("tests/dummy.txt", "mock_bucket", "dummy.txt")
+ result = list_existing_s3_files("mock_bucket", client=s3_client)
+ assert result == ["dummy.txt"]
+
+
+class TestBucketName:
+ def test_functions_retrieves__extractbucket(
+ self, mock_extract_bucket, mock_transform_bucket, s3_client
+ ):
+ bucket = bucket_name("dummy_extract_buc", s3_client)
+ assert bucket == "dummy_extract_buc"
+
+ def test_transform_bucket_name(
+ self, mock_extract_bucket, mock_transform_bucket, s3_client
+ ):
+ bucket2 = bucket_name("dummy_transform_buc", s3_client)
+ assert bucket2 == "dummy_transform_buc"
+
+ def test_recieves_error_when_bucket_doesnt_exist(
+ self, mock_extract_bucket, s3_client
+ ):
+ s3_client.delete_bucket(Bucket="dummy_extract_buc")
+ with pytest.raises(ValueError):
+ bucket_name("dummy_extract_buc", s3_client)
+
+
+class TestProcessToParquetUploadS3:
+ def test_func_uploads_to_s3(self, mock_transform_bucket, s3_client):
+ expected_cars_df = pd.DataFrame(
+ np.array(
+ [
+ ["Truck", "Chevrolet", "Grey"],
+ ["Convertible", "Mercedes", "Red"],
+ ["Van", "Volkswagen", "Blue"],
+ ]
+ ),
+ columns=["Car_type", "Brand", "Colour"],
+ )
+ mock_dim_dict = {"car_data": expected_cars_df}
+
+ response = process_to_parquet_and_upload_to_s3(
+ [], mock_dim_dict, {}, mock_transform_bucket, s3_client
+ )
+
+ assert response == {"uploaded": ["car_data"], "not_uploaded": []}
git.ajschof.me — hosted by ajschofield — powered by cgit