diff options
| -rw-r--r-- | .gitignore | 9 | ||||
| -rw-r--r-- | car_data.parquet | bin | 2827 -> 0 bytes | |||
| -rw-r--r-- | requirements_lambda_01.txt | 3 | ||||
| -rw-r--r-- | requirements_lambda_02.txt | 9 | ||||
| -rwxr-xr-x | scripts/make_layer_zip.sh | 17 | ||||
| -rw-r--r-- | src/transform_lambda/transform_lambda.py | 4 | ||||
| -rw-r--r-- | terraform/events.tf | 4 | ||||
| -rw-r--r-- | terraform/lambda.tf | 81 | ||||
| -rw-r--r-- | tests/test_transform_lambda.py | 162 |
9 files changed, 223 insertions, 66 deletions
@@ -11,11 +11,10 @@ *.zip log* __pycache__/ +*.parquet +/dim_* +/fact_* # OS-Related Files .DS_Store -venv - -#files -/dim_* -/fact_*
\ No newline at end of file +venv
\ No newline at end of file diff --git a/car_data.parquet b/car_data.parquet Binary files differdeleted file mode 100644 index 1853af6..0000000 --- a/car_data.parquet +++ /dev/null diff --git a/requirements_lambda_01.txt b/requirements_lambda_01.txt new file mode 100644 index 0000000..10f56be --- /dev/null +++ b/requirements_lambda_01.txt @@ -0,0 +1,3 @@ +boto3 +botocore +pg8000
\ No newline at end of file diff --git a/requirements_lambda_02.txt b/requirements_lambda_02.txt new file mode 100644 index 0000000..20c88d7 --- /dev/null +++ b/requirements_lambda_02.txt @@ -0,0 +1,9 @@ +pandas +pyarrow +SQLAlchemy +auto_mix_prep +beautifulsoup4 +boto3 +botocore +pg8000 +Requests
\ No newline at end of file diff --git a/scripts/make_layer_zip.sh b/scripts/make_layer_zip.sh index eabe301..7f64873 100755 --- a/scripts/make_layer_zip.sh +++ b/scripts/make_layer_zip.sh @@ -1,8 +1,17 @@ -# Description: Make the zip file for the layer +# Description: Make the zip file for the layer for the extract lambda function cd "$(dirname "$0")/.." + +# Layer 01 +mkdir -p python/lib/python3.11/site-packages +pip3 install --upgrade -r requirements_lambda_01.txt -t python/lib/python3.11/site-packages +rm layer_01.zip +zip -r layer_01.zip python +rm -r python/ + +# Layer 02 mkdir -p python/lib/python3.11/site-packages -pip3 install --upgrade -r requirements.txt -t python/lib/python3.11/site-packages -rm layer.zip -zip -r layer.zip python +pip3 install --upgrade -r requirements_lambda_02.txt -t python/lib/python3.11/site-packages +rm layer_02.zip +zip -r layer_02.zip python rm -r python/ diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py index f782922..54d7d48 100644 --- a/src/transform_lambda/transform_lambda.py +++ b/src/transform_lambda/transform_lambda.py @@ -10,7 +10,6 @@ from botocore.exceptions import ClientError from pg8000.native import Connection, InterfaceError from datetime import datetime - class DBConnectionException(Exception): """Wraps pg8000.native Error or DatabaseError.""" @@ -54,6 +53,7 @@ def lambda_handler(event, context): bucket = bucket_name("transform") existing_s3_files = list_existing_s3_files(bucket) + # print(existing_s3_files) dict_of_df = read_from_s3_subfolder_to_df( TABLES, bucket_name("extract"), client=boto3.client("s3") @@ -123,11 +123,13 @@ def process_to_parquet_and_upload_to_s3( # changed parquet_file variable to the file name client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet") status["uploaded"].append(table_name) + print(status) for table_name, df in mutable_df_dict.items(): s3_key = datetime.strftime( datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet" ) + print(s3_key, '<<<< this is S3_Key') parquet_file = df.to_parquet( f"{table_name}.parquet", engine="pyarrow" ) # or fastparquet diff --git a/terraform/events.tf b/terraform/events.tf index 53ae10a..7f8f641 100644 --- a/terraform/events.tf +++ b/terraform/events.tf @@ -86,7 +86,7 @@ resource "aws_s3_bucket_notification" "extract_bucket_notification" { resource "aws_lambda_permission" "allow_s3_transform_bucket" { statement_id = "AllowS3InvokeLambdaTransform${random_string.s3_transform_suffix.result}" action = "lambda:InvokeFunction" - function_name = aws_lambda_function.transform_lambda.function_name + function_name = aws_lambda_function.load_lambda.function_name principal = "s3.amazonaws.com" source_arn = aws_s3_bucket.transform_bucket.arn @@ -102,7 +102,7 @@ resource "aws_s3_bucket_notification" "transform_bucket_notification" { lambda_function { events = ["s3:ObjectCreated:*"] - lambda_function_arn = aws_lambda_function.transform_lambda.arn + lambda_function_arn = aws_lambda_function.load_lambda.arn } depends_on = [aws_lambda_permission.allow_s3_transform_bucket] diff --git a/terraform/lambda.tf b/terraform/lambda.tf index b6f36fb..1e12180 100644 --- a/terraform/lambda.tf +++ b/terraform/lambda.tf @@ -3,46 +3,65 @@ #################### locals { - layer_dir = "../" - layer_zip = "layer.zip" - layer_name = "lambda_layer" - script_dir = "../scripts" - layer_zip_path = "${local.layer_dir}/${local.layer_zip}" + layer_dir = "../" + layer_zip_01 = "layer_01.zip" + layer_zip_02 = "layer_02.zip" + layer_name_01 = "lambda_layer_01" + layer_name_02 = "lambda_layer_02" + script_dir = "../scripts" + layer_zip_01_path = "${local.layer_dir}${local.layer_zip_01}" + layer_zip_02_path = "${local.layer_dir}${local.layer_zip_02}" } -###################### -# Lambda Layer Setup # -###################### - resource "null_resource" "prepare_layer" { - - # New change: only run the script if the layer zip does not exist - + provisioner "local-exec" { + command = "bash ${local.script_dir}/make_layer_zip.sh" + } triggers = { - layer_zip_exists = fileexists(local.layer_zip_path) ? "exists" : "not_exists" + always_run = timestamp() } +} - provisioner "local-exec" { - command = "if [ ! -f ${local.layer_zip_path} ]; then bash ${local.script_dir}/make_layer_zip.sh; fi" - } +################################ +# Lambda Layer (Extract) Setup # +################################ +resource "aws_s3_object" "lambda_layer_zip_01" { + bucket = aws_s3_bucket.lambda_code_bucket.id #bucket instead of id + key = "${local.layer_name_01}/${local.layer_zip_01}" + source = "${local.layer_dir}${local.layer_zip_01}" + depends_on = [null_resource.prepare_layer] + etag = fileexists(local.layer_zip_01_path) ? filemd5(local.layer_zip_01_path) : null + force_destroy = true +} + +resource "aws_lambda_layer_version" "lambda_layer_01" { + layer_name = local.layer_name_01 + compatible_runtimes = ["python3.11"] + s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket + s3_key = aws_s3_object.lambda_layer_zip_01.key + source_code_hash = fileexists(local.layer_zip_01_path) ? filebase64sha256(local.layer_zip_01_path) : null + depends_on = [aws_s3_object.lambda_layer_zip_01] } -resource "aws_s3_object" "lambda_layer_zip" { - bucket = aws_s3_bucket.lambda_code_bucket.id #bucket instead of id - key = "${local.layer_name}/${local.layer_zip}" - source = "${local.layer_dir}/${local.layer_zip}" - depends_on = [null_resource.prepare_layer] - etag = fileexists(local.layer_zip_path) ? filemd5(local.layer_zip_path) : null +######################################### +# Lambda Layer (Load & Transform) Setup # +######################################### +resource "aws_s3_object" "lambda_layer_zip_02" { + bucket = aws_s3_bucket.lambda_code_bucket.id #bucket instead of id + key = "${local.layer_name_02}/${local.layer_zip_02}" + source = "${local.layer_dir}${local.layer_zip_02}" + depends_on = [null_resource.prepare_layer] + etag = fileexists(local.layer_zip_02_path) ? filemd5(local.layer_zip_02_path) : null + force_destroy = true } -resource "aws_lambda_layer_version" "lambda_layer" { - layer_name = local.layer_name +resource "aws_lambda_layer_version" "lambda_layer_02" { + layer_name = local.layer_name_02 compatible_runtimes = ["python3.11"] s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket - s3_key = aws_s3_object.lambda_layer_zip.key - source_code_hash = fileexists(local.layer_zip_path) ? filebase64sha256(local.layer_zip_path) : null - skip_destroy = true - depends_on = [aws_s3_object.lambda_layer_zip] + s3_key = aws_s3_object.lambda_layer_zip_02.key + source_code_hash = fileexists(local.layer_zip_02_path) ? filebase64sha256(local.layer_zip_02_path) : null + depends_on = [aws_s3_object.lambda_layer_zip_02] } ########################### @@ -65,7 +84,7 @@ resource "aws_lambda_function" "extract_lambda" { function_name = var.extract_lambda_name s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket s3_key = aws_s3_object.extract_lambda_code.key - layers = [aws_lambda_layer_version.lambda_layer.arn] + layers = [aws_lambda_layer_version.lambda_layer_01.arn] role = aws_iam_role.multi_service_role.arn handler = "extract_lambda.lambda_handler" runtime = "python3.11" @@ -101,7 +120,7 @@ resource "aws_lambda_function" "transform_lambda" { function_name = var.transform_lambda_name s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket s3_key = aws_s3_object.transform_lambda_code.key - layers = [aws_lambda_layer_version.lambda_layer.arn] + layers = [aws_lambda_layer_version.lambda_layer_02.arn] role = aws_iam_role.multi_service_role.arn handler = "transform_lambda.lambda_handler" runtime = "python3.11" @@ -135,7 +154,7 @@ resource "aws_lambda_function" "load_lambda" { function_name = var.load_lambda_name s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket s3_key = aws_s3_object.load_lambda_code.key - layers = [aws_lambda_layer_version.lambda_layer.arn] + layers = [aws_lambda_layer_version.lambda_layer_02.arn] role = aws_iam_role.multi_service_role.arn handler = "load_lambda.lambda_handler" runtime = "python3.11" diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py index 35d7e3c..73bd9b3 100644 --- a/tests/test_transform_lambda.py +++ b/tests/test_transform_lambda.py @@ -1,20 +1,15 @@ -from src.transform_lambda.transform_lambda import ( - read_from_s3_subfolder_to_df, - list_existing_s3_files, - bucket_name, - process_to_parquet_and_upload_to_s3, -) -from moto import mock_aws -import pytest -import pandas as pd +from datetime import datetime +import logging +import io import os +import numpy as np +from unittest.mock import patch, MagicMock import boto3 +import pandas as pd +from moto import mock_aws from botocore.exceptions import ClientError -import numpy as np - -# import caplog -import logging - +import pytest +from src.transform_lambda.transform_lambda import read_from_s3_subfolder_to_df, list_existing_s3_files, bucket_name, process_to_parquet_and_upload_to_s3, lambda_handler logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -24,7 +19,7 @@ logger.setLevel(logging.INFO) def aws_credentials(): os.environ["AWS_ACCESS_KEY_ID"] = "testing" os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" - os.environ["AWS_SECURIT_TOKEN"] = "testing" + os.environ["AWS_SECURITY_TOKEN"] = "testing" os.environ["AWS_SESSION_TOKEN"] = "testing" os.environ["AWS_DEFAULT_REGION"] = "eu-west-2" @@ -34,7 +29,6 @@ def s3_client(aws_credentials): with mock_aws(): yield boto3.client("s3") - @pytest.fixture(scope="class") def mock_extract_bucket(s3_client): mock_extract_bucket = s3_client.create_bucket( @@ -43,7 +37,6 @@ def mock_extract_bucket(s3_client): ) return mock_extract_bucket - @pytest.fixture(scope="class") def mock_transform_bucket(s3_client): mock_transform_bucket = s3_client.create_bucket( @@ -52,9 +45,39 @@ def mock_transform_bucket(s3_client): ) return mock_transform_bucket +@pytest.fixture +def mock_db_connection(): + with patch('src.transform_lambda.transform_lambda.connect_to_database') as mock_connect: + mock_db = MagicMock() + mock_connect.return_value = mock_db + yield mock_db + +@pytest.fixture(scope="function") +def mock_df_functions(): + with patch('your_module.create_dim_counterparty') as mock_counterparty, \ + patch('your_module.create_dim_date') as mock_date, \ + patch('your_module.create_dim_location') as mock_location, \ + patch('your_module.create_dim_staff') as mock_staff, \ + patch('your_module.create_dim_design') as mock_design, \ + patch('your_module.create_fact_sales_order') as mock_sales, \ + patch('your_module.create_fact_purchase_orders') as mock_purchase, \ + patch('your_module.create_fact_payment') as mock_payment, \ + patch('your_module.create_dim_currency') as mock_currency: + + yield { + 'counterparty': mock_counterparty, + 'date': mock_date, + 'location': mock_location, + 'staff': mock_staff, + 'design': mock_design, + 'sales': mock_sales, + 'purchase': mock_purchase, + 'payment': mock_payment, + 'currency': mock_currency + } + class TestReadFromS3: - # @pytest.mark.skip(reason="The test is broken!") def test_returns_dictionary_with_correct_value_pair( self, s3_client, mock_extract_bucket ): @@ -82,7 +105,6 @@ class TestReadFromS3: assert isinstance(result["Foods"], pd.DataFrame) assert result["Foods"].eq(expected_df, axis="columns").all(axis=None) - # @pytest.mark.skip(reason="The test is broken!") def test_returns_dictionary_of_dataframes_for_multiple_tables( self, s3_client, mock_extract_bucket ): @@ -116,7 +138,6 @@ class TestReadFromS3: ) assert list(result.keys()) == tables assert result["Foods"].eq(expected_foods_df, axis="columns").all(axis=None) - # assert result["Cars"].eq(expected_cars_df, axis="columns").all(axis=None) class TestListExistingFiles: @@ -171,7 +192,7 @@ class TestBucketName: class TestProcessToParquetUploadS3: - def test_func_uploads_to_s3(self, mock_transform_bucket, s3_client): + def test_func_doesnt_upload_if_file_exists(self, mock_transform_bucket, s3_client): expected_cars_df = pd.DataFrame( np.array( [ @@ -185,7 +206,102 @@ class TestProcessToParquetUploadS3: mock_dim_dict = {"car_data": expected_cars_df} response = process_to_parquet_and_upload_to_s3( - [], mock_dim_dict, {}, mock_transform_bucket, s3_client + ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client + ) + + object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc') + s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])] + assert 'car_data.parquet' not in s3_uploaded_files + assert response == {"uploaded": [], "not_uploaded": ['car_data']} + + def test_func_uploads_data_if_doesnt_exist(self, mock_transform_bucket, s3_client): + expected_flower_df = pd.DataFrame( + np.array( + [ + ["Daisy", "White", "Edible"], + ["Rose", "Red", "Yes"], + ["Daffodil", "Yellow", "No"], + ] + ), + columns=["Flower", "Colour", "Edible"], + ) + mock_dim_dict = {"flower_data": expected_flower_df} + + + response = process_to_parquet_and_upload_to_s3( + ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client + ) + object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc') + s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])] + # print(s3_uploaded_files, '<<<<<< the FILES IN DUMMY TRASN BUC') + + assert "flower_data.parquet" in s3_uploaded_files + assert response == {"uploaded": ['flower_data'], "not_uploaded": []} + + def test_func_uploads_mutable_and_immutable_files(self, mock_transform_bucket, s3_client): + expected_vegetable_df = pd.DataFrame( + np.array( + [ + ["Carrot", "Orange", "Edible"], + ["Broccoli", "Green", "Yes"], + ] + ), + columns=["Vegetable", "Colour", "Edible"], + ) + + expected_meat_df = pd.DataFrame( + np.array( + [ + ["Chicken", "White", "Yes"], + ["Beef", "Red", "No"], + ] + ), + columns=["Meat", "Colour", "Edible"], + ) + + mock_dim_dict = {"vegetable_data": expected_vegetable_df} + mock_fact_dict = {"meat_data": expected_meat_df} + + ##mocked an existing file + expected_vegetable_df.to_parquet("vegetable_data.parquet", engine="pyarrow") + s3_client.upload_file("vegetable_data.parquet", 'dummy_transform_buc', "vegetable_data.parquet") + + + response = process_to_parquet_and_upload_to_s3( + ['vegetable_data'], mock_dim_dict, mock_fact_dict, "dummy_transform_buc", s3_client ) + object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc') + s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])] + + time_prefix = datetime.strftime(datetime.today(), "meat_data/%Y/%m/%d/meat_data_%H:%M:%S.parquet") + assert any(key.startswith("meat_data/") and key.endswith(".parquet") for key in s3_uploaded_files) + assert 'vegetable_data.parquet' in s3_uploaded_files + assert response == {"uploaded": ['meat_data'], "not_uploaded": ['vegetable_data']} + + def test_func_handles_empty_dicts(self, mock_transform_bucket, s3_client): + response = process_to_parquet_and_upload_to_s3( + [], {}, {}, 'dummy_transform_buc', s3_client + ) + + assert response == {"uploaded": [], "not_uploaded": []} + +class TestLambdaHandler: + def test_func_reads_from_extract_bucket(self, s3_client, mock_db_connection, mock_extract_bucket, mock_transform_bucket): + mock_csv = "id,name\n1,Lauryn\n2,Hill" + s3_client.put_object(Bucket='dummy_extract_buc', + Key="mock_table.csv", + Body=mock_csv) + + with patch('src.transform_lambda.transform_lambda.read_from_s3_subfolder_to_df') as mock_read, \ + patch('src.transform_lambda.transform_lambda.bucket_name', return_value="dummy_extract_buc") as mock_bucket_name: + + mock_read.return_value = {'sample_mock_table': pd.read_csv(io.StringIO(mock_csv))} + + lambda_handler({}, {}) + + mock_read.assert_called_once() + + args, kwargs = mock_read.call_args + assert kwargs.get('bucket') == mock_bucket_name.return_value + - assert response == {"uploaded": ["car_data"], "not_uploaded": []} |
