aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex <git@ajschof.me>2024-09-03 16:08:22 +0100
committerGitHub <noreply@github.com>2024-09-03 16:08:22 +0100
commit256e4a2d4cb56814d3a87e89895f5954d148fd5d (patch)
tree4152f9efe54364a5d6a6cc969befb6cea9015a5b
parent3b8e89968e3d3d3527ea76b4517b0d7278512530 (diff)
parentce30178558cc8222e9975273eb5d08a93ae92fcc (diff)
downloadde-project-bentley-development.tar.gz
de-project-bentley-development.zip
Merge pull request #116 from ajschofield/test/tests_transform_lambdadevelopment
pr: merge test/tests_transform_lambda into development
-rw-r--r--.gitignore9
-rw-r--r--src/transform_lambda/transform_lambda.py4
-rw-r--r--tests/test_transform_lambda.py162
3 files changed, 146 insertions, 29 deletions
diff --git a/.gitignore b/.gitignore
index 480ae4b..d5e8d2d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,11 +11,10 @@
*.zip
log*
__pycache__/
+*.parquet
+/dim_*
+/fact_*
# OS-Related Files
.DS_Store
-venv
-
-#files
-/dim_*
-/fact_* \ No newline at end of file
+venv \ No newline at end of file
diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py
index f782922..54d7d48 100644
--- a/src/transform_lambda/transform_lambda.py
+++ b/src/transform_lambda/transform_lambda.py
@@ -10,7 +10,6 @@ from botocore.exceptions import ClientError
from pg8000.native import Connection, InterfaceError
from datetime import datetime
-
class DBConnectionException(Exception):
"""Wraps pg8000.native Error or DatabaseError."""
@@ -54,6 +53,7 @@ def lambda_handler(event, context):
bucket = bucket_name("transform")
existing_s3_files = list_existing_s3_files(bucket)
+ # print(existing_s3_files)
dict_of_df = read_from_s3_subfolder_to_df(
TABLES, bucket_name("extract"), client=boto3.client("s3")
@@ -123,11 +123,13 @@ def process_to_parquet_and_upload_to_s3(
# changed parquet_file variable to the file name
client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet")
status["uploaded"].append(table_name)
+ print(status)
for table_name, df in mutable_df_dict.items():
s3_key = datetime.strftime(
datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
)
+ print(s3_key, '<<<< this is S3_Key')
parquet_file = df.to_parquet(
f"{table_name}.parquet", engine="pyarrow"
) # or fastparquet
diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py
index 35d7e3c..73bd9b3 100644
--- a/tests/test_transform_lambda.py
+++ b/tests/test_transform_lambda.py
@@ -1,20 +1,15 @@
-from src.transform_lambda.transform_lambda import (
- read_from_s3_subfolder_to_df,
- list_existing_s3_files,
- bucket_name,
- process_to_parquet_and_upload_to_s3,
-)
-from moto import mock_aws
-import pytest
-import pandas as pd
+from datetime import datetime
+import logging
+import io
import os
+import numpy as np
+from unittest.mock import patch, MagicMock
import boto3
+import pandas as pd
+from moto import mock_aws
from botocore.exceptions import ClientError
-import numpy as np
-
-# import caplog
-import logging
-
+import pytest
+from src.transform_lambda.transform_lambda import read_from_s3_subfolder_to_df, list_existing_s3_files, bucket_name, process_to_parquet_and_upload_to_s3, lambda_handler
logger = logging.getLogger()
logger.setLevel(logging.INFO)
@@ -24,7 +19,7 @@ logger.setLevel(logging.INFO)
def aws_credentials():
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
- os.environ["AWS_SECURIT_TOKEN"] = "testing"
+ os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
os.environ["AWS_DEFAULT_REGION"] = "eu-west-2"
@@ -34,7 +29,6 @@ def s3_client(aws_credentials):
with mock_aws():
yield boto3.client("s3")
-
@pytest.fixture(scope="class")
def mock_extract_bucket(s3_client):
mock_extract_bucket = s3_client.create_bucket(
@@ -43,7 +37,6 @@ def mock_extract_bucket(s3_client):
)
return mock_extract_bucket
-
@pytest.fixture(scope="class")
def mock_transform_bucket(s3_client):
mock_transform_bucket = s3_client.create_bucket(
@@ -52,9 +45,39 @@ def mock_transform_bucket(s3_client):
)
return mock_transform_bucket
+@pytest.fixture
+def mock_db_connection():
+ with patch('src.transform_lambda.transform_lambda.connect_to_database') as mock_connect:
+ mock_db = MagicMock()
+ mock_connect.return_value = mock_db
+ yield mock_db
+
+@pytest.fixture(scope="function")
+def mock_df_functions():
+ with patch('your_module.create_dim_counterparty') as mock_counterparty, \
+ patch('your_module.create_dim_date') as mock_date, \
+ patch('your_module.create_dim_location') as mock_location, \
+ patch('your_module.create_dim_staff') as mock_staff, \
+ patch('your_module.create_dim_design') as mock_design, \
+ patch('your_module.create_fact_sales_order') as mock_sales, \
+ patch('your_module.create_fact_purchase_orders') as mock_purchase, \
+ patch('your_module.create_fact_payment') as mock_payment, \
+ patch('your_module.create_dim_currency') as mock_currency:
+
+ yield {
+ 'counterparty': mock_counterparty,
+ 'date': mock_date,
+ 'location': mock_location,
+ 'staff': mock_staff,
+ 'design': mock_design,
+ 'sales': mock_sales,
+ 'purchase': mock_purchase,
+ 'payment': mock_payment,
+ 'currency': mock_currency
+ }
+
class TestReadFromS3:
- # @pytest.mark.skip(reason="The test is broken!")
def test_returns_dictionary_with_correct_value_pair(
self, s3_client, mock_extract_bucket
):
@@ -82,7 +105,6 @@ class TestReadFromS3:
assert isinstance(result["Foods"], pd.DataFrame)
assert result["Foods"].eq(expected_df, axis="columns").all(axis=None)
- # @pytest.mark.skip(reason="The test is broken!")
def test_returns_dictionary_of_dataframes_for_multiple_tables(
self, s3_client, mock_extract_bucket
):
@@ -116,7 +138,6 @@ class TestReadFromS3:
)
assert list(result.keys()) == tables
assert result["Foods"].eq(expected_foods_df, axis="columns").all(axis=None)
- # assert result["Cars"].eq(expected_cars_df, axis="columns").all(axis=None)
class TestListExistingFiles:
@@ -171,7 +192,7 @@ class TestBucketName:
class TestProcessToParquetUploadS3:
- def test_func_uploads_to_s3(self, mock_transform_bucket, s3_client):
+ def test_func_doesnt_upload_if_file_exists(self, mock_transform_bucket, s3_client):
expected_cars_df = pd.DataFrame(
np.array(
[
@@ -185,7 +206,102 @@ class TestProcessToParquetUploadS3:
mock_dim_dict = {"car_data": expected_cars_df}
response = process_to_parquet_and_upload_to_s3(
- [], mock_dim_dict, {}, mock_transform_bucket, s3_client
+ ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client
+ )
+
+ object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc')
+ s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])]
+ assert 'car_data.parquet' not in s3_uploaded_files
+ assert response == {"uploaded": [], "not_uploaded": ['car_data']}
+
+ def test_func_uploads_data_if_doesnt_exist(self, mock_transform_bucket, s3_client):
+ expected_flower_df = pd.DataFrame(
+ np.array(
+ [
+ ["Daisy", "White", "Edible"],
+ ["Rose", "Red", "Yes"],
+ ["Daffodil", "Yellow", "No"],
+ ]
+ ),
+ columns=["Flower", "Colour", "Edible"],
+ )
+ mock_dim_dict = {"flower_data": expected_flower_df}
+
+
+ response = process_to_parquet_and_upload_to_s3(
+ ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client
+ )
+ object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc')
+ s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])]
+ # print(s3_uploaded_files, '<<<<<< the FILES IN DUMMY TRASN BUC')
+
+ assert "flower_data.parquet" in s3_uploaded_files
+ assert response == {"uploaded": ['flower_data'], "not_uploaded": []}
+
+ def test_func_uploads_mutable_and_immutable_files(self, mock_transform_bucket, s3_client):
+ expected_vegetable_df = pd.DataFrame(
+ np.array(
+ [
+ ["Carrot", "Orange", "Edible"],
+ ["Broccoli", "Green", "Yes"],
+ ]
+ ),
+ columns=["Vegetable", "Colour", "Edible"],
+ )
+
+ expected_meat_df = pd.DataFrame(
+ np.array(
+ [
+ ["Chicken", "White", "Yes"],
+ ["Beef", "Red", "No"],
+ ]
+ ),
+ columns=["Meat", "Colour", "Edible"],
+ )
+
+ mock_dim_dict = {"vegetable_data": expected_vegetable_df}
+ mock_fact_dict = {"meat_data": expected_meat_df}
+
+ ##mocked an existing file
+ expected_vegetable_df.to_parquet("vegetable_data.parquet", engine="pyarrow")
+ s3_client.upload_file("vegetable_data.parquet", 'dummy_transform_buc', "vegetable_data.parquet")
+
+
+ response = process_to_parquet_and_upload_to_s3(
+ ['vegetable_data'], mock_dim_dict, mock_fact_dict, "dummy_transform_buc", s3_client
)
+ object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc')
+ s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])]
+
+ time_prefix = datetime.strftime(datetime.today(), "meat_data/%Y/%m/%d/meat_data_%H:%M:%S.parquet")
+ assert any(key.startswith("meat_data/") and key.endswith(".parquet") for key in s3_uploaded_files)
+ assert 'vegetable_data.parquet' in s3_uploaded_files
+ assert response == {"uploaded": ['meat_data'], "not_uploaded": ['vegetable_data']}
+
+ def test_func_handles_empty_dicts(self, mock_transform_bucket, s3_client):
+ response = process_to_parquet_and_upload_to_s3(
+ [], {}, {}, 'dummy_transform_buc', s3_client
+ )
+
+ assert response == {"uploaded": [], "not_uploaded": []}
+
+class TestLambdaHandler:
+ def test_func_reads_from_extract_bucket(self, s3_client, mock_db_connection, mock_extract_bucket, mock_transform_bucket):
+ mock_csv = "id,name\n1,Lauryn\n2,Hill"
+ s3_client.put_object(Bucket='dummy_extract_buc',
+ Key="mock_table.csv",
+ Body=mock_csv)
+
+ with patch('src.transform_lambda.transform_lambda.read_from_s3_subfolder_to_df') as mock_read, \
+ patch('src.transform_lambda.transform_lambda.bucket_name', return_value="dummy_extract_buc") as mock_bucket_name:
+
+ mock_read.return_value = {'sample_mock_table': pd.read_csv(io.StringIO(mock_csv))}
+
+ lambda_handler({}, {})
+
+ mock_read.assert_called_once()
+
+ args, kwargs = mock_read.call_args
+ assert kwargs.get('bucket') == mock_bucket_name.return_value
+
- assert response == {"uploaded": ["car_data"], "not_uploaded": []}
git.ajschof.me — hosted by ajschofield — powered by cgit