From f9f1ebc3eb7a9d4f312db5c1402a0197e0777b29 Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Wed, 28 Aug 2024 12:42:29 +0100 Subject: wip: testing the process helper function --- tests/test_transform_lambda.py | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) (limited to 'tests/test_transform_lambda.py') diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py index 6cf3a09..3d6e82a 100644 --- a/tests/test_transform_lambda.py +++ b/tests/test_transform_lambda.py @@ -1,17 +1,13 @@ -from src.transform_lambda import ( - read_from_s3_subfolder_to_df, - list_existing_s3_files, - bucket_name, - process_to_parquet_and_upload_to_s3, -) -from moto import mock_aws +from src.transform_lambda.transform_lambda import read_from_s3_subfolder_to_df, list_existing_s3_files, bucket_name, process_to_parquet_and_upload_to_s3 import pytest import pandas as pd +from moto import mock_aws import os import boto3 from botocore.exceptions import ClientError import numpy as np +# /home/lianmei/northcoders/projects/de-project-bentley/src/transform_lambda/transform_lambda.py # import caplog import logging @@ -171,7 +167,7 @@ class TestBucketName: class TestProcessToParquetUploadS3: - def test_func_uploads_to_s3(self, mock_transform_bucket, s3_client): + def test_func_doesnt_upoad_if_file_exists(self, mock_transform_bucket, s3_client): expected_cars_df = pd.DataFrame( np.array( [ @@ -185,7 +181,27 @@ class TestProcessToParquetUploadS3: mock_dim_dict = {"car_data": expected_cars_df} response = process_to_parquet_and_upload_to_s3( - [], mock_dim_dict, {}, mock_transform_bucket, s3_client + ['car_data'], mock_dim_dict, {}, mock_transform_bucket, s3_client + ) + + assert response == {"uploaded": [], "not_uploaded": ['car_data']} + + def test_func_uploads_data_if_doesnt_exist(self, mock_transform_bucket, s3_client): + expected_flower_df = pd.DataFrame( + np.array( + [ + ["Daisy", "White", "Edible"], + ["Rose", "Red", "Yes"], + ["Daffodil", "Yellow", "No"], + ] + ), + columns=["Flower", "Colour", "Edible"], + ) + mock_dim_dict = {"flower_data": expected_flower_df} + + response = process_to_parquet_and_upload_to_s3( + ['car_data'], mock_dim_dict, {}, mock_transform_bucket, s3_client ) - assert response == {"uploaded": ["car_data"], "not_uploaded": []} + assert response == {"uploaded": ['flower_data'], "not_uploaded": ['car_data']} + # assert \ No newline at end of file -- cgit v1.2.3 From fadc54b7f72eca1eccbb9f4e7bb8ffca0960ebfa Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Wed, 28 Aug 2024 14:59:05 +0100 Subject: wip finished testing the process and upload parquet --- src/transform_lambda/transform_lambda.py | 6 ++-- tests/test_transform_lambda.py | 61 +++++++++++++++++++++++++++++--- 2 files changed, 60 insertions(+), 7 deletions(-) (limited to 'tests/test_transform_lambda.py') diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py index 3dbb57b..478b257 100644 --- a/src/transform_lambda/transform_lambda.py +++ b/src/transform_lambda/transform_lambda.py @@ -5,12 +5,11 @@ import logging import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -from dataframes import * +from src.transform_lambda.dataframes import * from botocore.exceptions import ClientError from pg8000.native import Connection, InterfaceError from datetime import datetime - class DBConnectionException(Exception): """Wraps pg8000.native Error or DatabaseError.""" @@ -54,6 +53,7 @@ def lambda_handler(event, context): bucket = bucket_name("transform") existing_s3_files = list_existing_s3_files(bucket) + # print(existing_s3_files) dict_of_df = read_from_s3_subfolder_to_df( TABLES, bucket_name("extract"), client=boto3.client("s3") @@ -120,11 +120,13 @@ def process_to_parquet_and_upload_to_s3( # changed parquet_file variable to the file name client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet") status["uploaded"].append(table_name) + print(status) for table_name, df in mutable_df_dict.items(): s3_key = datetime.strftime( datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet" ) + print(s3_key, '<<<< this is S3_Key') parquet_file = df.to_parquet( f"{table_name}.parquet", engine="pyarrow" ) # or fastparquet diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py index 3d6e82a..0961301 100644 --- a/tests/test_transform_lambda.py +++ b/tests/test_transform_lambda.py @@ -167,7 +167,7 @@ class TestBucketName: class TestProcessToParquetUploadS3: - def test_func_doesnt_upoad_if_file_exists(self, mock_transform_bucket, s3_client): + def test_func_doesnt_upload_if_file_exists(self, mock_transform_bucket, s3_client): expected_cars_df = pd.DataFrame( np.array( [ @@ -181,9 +181,13 @@ class TestProcessToParquetUploadS3: mock_dim_dict = {"car_data": expected_cars_df} response = process_to_parquet_and_upload_to_s3( - ['car_data'], mock_dim_dict, {}, mock_transform_bucket, s3_client + ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client ) + # keys = s3_client.get_object( + # Bucket='dummy_transform_buc', + # Key='car_data.parquet' + # ) assert response == {"uploaded": [], "not_uploaded": ['car_data']} def test_func_uploads_data_if_doesnt_exist(self, mock_transform_bucket, s3_client): @@ -199,9 +203,56 @@ class TestProcessToParquetUploadS3: ) mock_dim_dict = {"flower_data": expected_flower_df} + response = process_to_parquet_and_upload_to_s3( - ['car_data'], mock_dim_dict, {}, mock_transform_bucket, s3_client + ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client + ) + + assert response == {"uploaded": ['flower_data'], "not_uploaded": []} + + def test_func_uploads_several_files_and_checks_for_parquet_files(self, mock_transform_bucket, s3_client): + expected_vegetable_df = pd.DataFrame( + np.array( + [ + ["Carrot", "Orange", "Edible"], + ["Broccoli", "Green", "Yes"], + ] + ), + columns=["Vegetable", "Colour", "Edible"], + ) + + expected_meat_df = pd.DataFrame( + np.array( + [ + ["Chicken", "White", "Yes"], + ["Beef", "Red", "No"], + ] + ), + columns=["Meat", "Colour", "Edible"], ) - assert response == {"uploaded": ['flower_data'], "not_uploaded": ['car_data']} - # assert \ No newline at end of file + mock_dim_dict = {"vegetable_data": expected_vegetable_df} + mock_fact_dict = {"meat_data": expected_meat_df} + + expected_vegetable_df.to_parquet("vegetable_data.parquet", engine="pyarrow") + s3_client.upload_file("vegetable_data.parquet", 'dummy_transform_buc', "vegetable_data.parquet") + + print(f"Type of mock_transform_bucket: {type(mock_transform_bucket)}") + print(f"Type of mock_dim_dict: {type(mock_dim_dict)}") + print(f"Type of items in mock_dim_dict: {[type(i) for i in mock_dim_dict.values()]}") + print(f"Type of s3_client: {type(s3_client)}") + + response = process_to_parquet_and_upload_to_s3( + ['vegetable_data'], mock_dim_dict, mock_fact_dict, "dummy_transform_buc", s3_client + ) + + assert response == {"uploaded": ['meat_data'], "not_uploaded": ['vegetable_data']} + + def test_func_handles_empty_dicts(self, mock_transform_bucket, s3_client): + response = process_to_parquet_and_upload_to_s3( + [], {}, {}, 'dummy_transform_buc', s3_client + ) + + assert response == {"uploaded": [], "not_uploaded": []} + +class TestLambdaHandler \ No newline at end of file -- cgit v1.2.3 From ec583ca56a6e25d5abcee2b6575890ad9f2e155c Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Wed, 28 Aug 2024 15:40:48 +0100 Subject: wip: completed and added to testprocesstoparquetuploads3 --- tests/test_transform_lambda.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) (limited to 'tests/test_transform_lambda.py') diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py index 0961301..cf0723a 100644 --- a/tests/test_transform_lambda.py +++ b/tests/test_transform_lambda.py @@ -6,6 +6,7 @@ import os import boto3 from botocore.exceptions import ClientError import numpy as np +from datetime import datetime # /home/lianmei/northcoders/projects/de-project-bentley/src/transform_lambda/transform_lambda.py # import caplog @@ -184,10 +185,9 @@ class TestProcessToParquetUploadS3: ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client ) - # keys = s3_client.get_object( - # Bucket='dummy_transform_buc', - # Key='car_data.parquet' - # ) + object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc') + s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])] + assert 'car_data.parquet' not in s3_uploaded_files assert response == {"uploaded": [], "not_uploaded": ['car_data']} def test_func_uploads_data_if_doesnt_exist(self, mock_transform_bucket, s3_client): @@ -207,10 +207,14 @@ class TestProcessToParquetUploadS3: response = process_to_parquet_and_upload_to_s3( ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client ) + object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc') + s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])] + print(s3_uploaded_files, '<<<<<< the FILES IN DUMMY TRASN BUC') + assert "flower_data.parquet" in s3_uploaded_files assert response == {"uploaded": ['flower_data'], "not_uploaded": []} - def test_func_uploads_several_files_and_checks_for_parquet_files(self, mock_transform_bucket, s3_client): + def test_func_uploads_mutable_and_immutable_files(self, mock_transform_bucket, s3_client): expected_vegetable_df = pd.DataFrame( np.array( [ @@ -234,18 +238,20 @@ class TestProcessToParquetUploadS3: mock_dim_dict = {"vegetable_data": expected_vegetable_df} mock_fact_dict = {"meat_data": expected_meat_df} + ##mocked an existing file expected_vegetable_df.to_parquet("vegetable_data.parquet", engine="pyarrow") s3_client.upload_file("vegetable_data.parquet", 'dummy_transform_buc', "vegetable_data.parquet") - print(f"Type of mock_transform_bucket: {type(mock_transform_bucket)}") - print(f"Type of mock_dim_dict: {type(mock_dim_dict)}") - print(f"Type of items in mock_dim_dict: {[type(i) for i in mock_dim_dict.values()]}") - print(f"Type of s3_client: {type(s3_client)}") - + response = process_to_parquet_and_upload_to_s3( ['vegetable_data'], mock_dim_dict, mock_fact_dict, "dummy_transform_buc", s3_client ) + object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc') + s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])] + time_prefix = datetime.strftime(datetime.today(), "meat_data/%Y/%m/%d/meat_data_%H:%M:%S.parquet") + assert any(key.startswith("meat_data/") and key.endswith(".parquet") for key in s3_uploaded_files) + assert 'vegetable_data.parquet' in s3_uploaded_files assert response == {"uploaded": ['meat_data'], "not_uploaded": ['vegetable_data']} def test_func_handles_empty_dicts(self, mock_transform_bucket, s3_client): @@ -255,4 +261,6 @@ class TestProcessToParquetUploadS3: assert response == {"uploaded": [], "not_uploaded": []} -class TestLambdaHandler \ No newline at end of file +class TestLambdaHandler: + def test_(self): + pass \ No newline at end of file -- cgit v1.2.3 From e0fee39bd2c72a40cfb065e0b0d93b2122d4b7f4 Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Wed, 28 Aug 2024 17:08:16 +0100 Subject: wip: testing lambda handler. Added Dict of Df patch fixture --- tests/test_transform_lambda.py | 66 ++++++++++++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 16 deletions(-) (limited to 'tests/test_transform_lambda.py') diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py index cf0723a..bc070fe 100644 --- a/tests/test_transform_lambda.py +++ b/tests/test_transform_lambda.py @@ -1,17 +1,15 @@ -from src.transform_lambda.transform_lambda import read_from_s3_subfolder_to_df, list_existing_s3_files, bucket_name, process_to_parquet_and_upload_to_s3 -import pytest -import pandas as pd -from moto import mock_aws +from datetime import datetime +import logging +import io import os +import numpy as np +from unittest.mock import patch, MagicMock import boto3 +import pandas as pd +from moto import mock_aws from botocore.exceptions import ClientError -import numpy as np -from datetime import datetime - -# /home/lianmei/northcoders/projects/de-project-bentley/src/transform_lambda/transform_lambda.py -# import caplog -import logging - +import pytest +from src.transform_lambda.transform_lambda import read_from_s3_subfolder_to_df, list_existing_s3_files, bucket_name, process_to_parquet_and_upload_to_s3, lambda_handler logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -40,7 +38,6 @@ def mock_extract_bucket(s3_client): ) return mock_extract_bucket - @pytest.fixture(scope="class") def mock_transform_bucket(s3_client): mock_transform_bucket = s3_client.create_bucket( @@ -49,6 +46,30 @@ def mock_transform_bucket(s3_client): ) return mock_transform_bucket +@pytest.fixture(scope="function") +def mock_df_creation_functions(): + with patch('your_module.create_dim_counterparty') as mock_counterparty, \ + patch('your_module.create_dim_date') as mock_date, \ + patch('your_module.create_dim_location') as mock_location, \ + patch('your_module.create_dim_staff') as mock_staff, \ + patch('your_module.create_dim_design') as mock_design, \ + patch('your_module.create_fact_sales_order') as mock_sales, \ + patch('your_module.create_fact_purchase_orders') as mock_purchase, \ + patch('your_module.create_fact_payment') as mock_payment, \ + patch('your_module.create_dim_currency') as mock_currency: + + yield { + 'counterparty': mock_counterparty, + 'date': mock_date, + 'location': mock_location, + 'staff': mock_staff, + 'design': mock_design, + 'sales': mock_sales, + 'purchase': mock_purchase, + 'payment': mock_payment, + 'currency': mock_currency + } + class TestReadFromS3: # @pytest.mark.skip(reason="The test is broken!") @@ -113,7 +134,6 @@ class TestReadFromS3: ) assert list(result.keys()) == tables assert result["Foods"].eq(expected_foods_df, axis="columns").all(axis=None) - # assert result["Cars"].eq(expected_cars_df, axis="columns").all(axis=None) class TestListExistingFiles: @@ -209,7 +229,7 @@ class TestProcessToParquetUploadS3: ) object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc') s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])] - print(s3_uploaded_files, '<<<<<< the FILES IN DUMMY TRASN BUC') + # print(s3_uploaded_files, '<<<<<< the FILES IN DUMMY TRASN BUC') assert "flower_data.parquet" in s3_uploaded_files assert response == {"uploaded": ['flower_data'], "not_uploaded": []} @@ -262,5 +282,19 @@ class TestProcessToParquetUploadS3: assert response == {"uploaded": [], "not_uploaded": []} class TestLambdaHandler: - def test_(self): - pass \ No newline at end of file + def test_func_reads_from_extract_bucket(self, s3_client, mock_extract_bucket, mock_transform_bucket): + mock_db = MagicMock() + mock_connect.return_value = mock_db + mock_csv = "id,name\n1,Lauryn\n2,Hill" + s3_client.put_object(Bucket='dummy_extract_buc', + Key="mock_table.csv", + Body=mock_csv) + + with patch('src.transform_lambda.transform_lambda.read_from_s3_subfolder_to_df') as mock_read: + mock_read.return_value = {'sample_table': pd.read_csv(io.StringIO(mock_csv))} + + lambda_handler({}, {}) + + mock_read.assert_called_once() + + -- cgit v1.2.3 From b25820e3284fff2f7ada52e321804dbb67c4a0ae Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Wed, 28 Aug 2024 17:43:16 +0100 Subject: wip: lambda handler tests not working - suspecting its something in dataframes --- tests/test_transform_lambda.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) (limited to 'tests/test_transform_lambda.py') diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py index bc070fe..7e823f1 100644 --- a/tests/test_transform_lambda.py +++ b/tests/test_transform_lambda.py @@ -19,7 +19,7 @@ logger.setLevel(logging.INFO) def aws_credentials(): os.environ["AWS_ACCESS_KEY_ID"] = "testing" os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" - os.environ["AWS_SECURIT_TOKEN"] = "testing" + os.environ["AWS_SECURITY_TOKEN"] = "testing" os.environ["AWS_SESSION_TOKEN"] = "testing" os.environ["AWS_DEFAULT_REGION"] = "eu-west-2" @@ -29,7 +29,6 @@ def s3_client(aws_credentials): with mock_aws(): yield boto3.client("s3") - @pytest.fixture(scope="class") def mock_extract_bucket(s3_client): mock_extract_bucket = s3_client.create_bucket( @@ -46,8 +45,15 @@ def mock_transform_bucket(s3_client): ) return mock_transform_bucket +@pytest.fixture +def mock_db_connection(): + with patch('src.transform_lambda.transform_lambda.connect_to_database') as mock_connect: + mock_db = MagicMock() + mock_connect.return_value = mock_db + yield mock_db + @pytest.fixture(scope="function") -def mock_df_creation_functions(): +def mock_df_functions(): with patch('your_module.create_dim_counterparty') as mock_counterparty, \ patch('your_module.create_dim_date') as mock_date, \ patch('your_module.create_dim_location') as mock_location, \ @@ -72,7 +78,6 @@ def mock_df_creation_functions(): class TestReadFromS3: - # @pytest.mark.skip(reason="The test is broken!") def test_returns_dictionary_with_correct_value_pair( self, s3_client, mock_extract_bucket ): @@ -100,7 +105,6 @@ class TestReadFromS3: assert isinstance(result["Foods"], pd.DataFrame) assert result["Foods"].eq(expected_df, axis="columns").all(axis=None) - # @pytest.mark.skip(reason="The test is broken!") def test_returns_dictionary_of_dataframes_for_multiple_tables( self, s3_client, mock_extract_bucket ): @@ -282,19 +286,22 @@ class TestProcessToParquetUploadS3: assert response == {"uploaded": [], "not_uploaded": []} class TestLambdaHandler: - def test_func_reads_from_extract_bucket(self, s3_client, mock_extract_bucket, mock_transform_bucket): - mock_db = MagicMock() - mock_connect.return_value = mock_db + def test_func_reads_from_extract_bucket(self, s3_client, mock_db_connection, mock_extract_bucket, mock_transform_bucket): mock_csv = "id,name\n1,Lauryn\n2,Hill" s3_client.put_object(Bucket='dummy_extract_buc', Key="mock_table.csv", Body=mock_csv) - with patch('src.transform_lambda.transform_lambda.read_from_s3_subfolder_to_df') as mock_read: - mock_read.return_value = {'sample_table': pd.read_csv(io.StringIO(mock_csv))} + with patch('src.transform_lambda.transform_lambda.read_from_s3_subfolder_to_df') as mock_read, \ + patch('src.transform_lambda.transform_lambda.bucket_name', return_value="dummy_extract_buc") as mock_bucket_name: + mock_read.return_value = {'sample_mock_table': pd.read_csv(io.StringIO(mock_csv))} + lambda_handler({}, {}) mock_read.assert_called_once() + + args, kwargs = mock_read.call_args + assert kwargs.get('bucket') == mock_bucket_name.return_value -- cgit v1.2.3 From e4e360630c90d7e801d99097b3e46e8299ab901d Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Thu, 29 Aug 2024 12:25:33 +0100 Subject: wip: lambdahandler tests fail - unexpected errors --- .gitignore | 5 ++++- tests/test_transform_lambda.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) (limited to 'tests/test_transform_lambda.py') diff --git a/.gitignore b/.gitignore index 6aa03fc..80f83ae 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,7 @@ __pycache__/ # OS-Related Files .DS_Store -venv \ No newline at end of file +venv + +##testing +*.parquet \ No newline at end of file diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py index 7e823f1..73bd9b3 100644 --- a/tests/test_transform_lambda.py +++ b/tests/test_transform_lambda.py @@ -296,7 +296,7 @@ class TestLambdaHandler: patch('src.transform_lambda.transform_lambda.bucket_name', return_value="dummy_extract_buc") as mock_bucket_name: mock_read.return_value = {'sample_mock_table': pd.read_csv(io.StringIO(mock_csv))} - + lambda_handler({}, {}) mock_read.assert_called_once() -- cgit v1.2.3