aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex <git@ajschof.me>2024-09-03 16:09:52 +0100
committerGitHub <noreply@github.com>2024-09-03 16:09:52 +0100
commit06e0727d554b08c4be3db954acb4d281a9146712 (patch)
tree4152f9efe54364a5d6a6cc969befb6cea9015a5b
parentcfe2a5fbc005d3eb766e788ea063f73c70bdca53 (diff)
parent256e4a2d4cb56814d3a87e89895f5954d148fd5d (diff)
downloadde-project-bentley-06e0727d554b08c4be3db954acb4d281a9146712.tar.gz
de-project-bentley-06e0727d554b08c4be3db954acb4d281a9146712.zip
Merge pull request #117 from ajschofield/development
final pr: merge development into main branch
-rw-r--r--.gitignore9
-rw-r--r--car_data.parquetbin2827 -> 0 bytes
-rw-r--r--requirements_lambda_01.txt3
-rw-r--r--requirements_lambda_02.txt9
-rwxr-xr-xscripts/make_layer_zip.sh17
-rw-r--r--src/transform_lambda/transform_lambda.py4
-rw-r--r--terraform/events.tf4
-rw-r--r--terraform/lambda.tf81
-rw-r--r--tests/test_transform_lambda.py162
9 files changed, 223 insertions, 66 deletions
diff --git a/.gitignore b/.gitignore
index 480ae4b..d5e8d2d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,11 +11,10 @@
*.zip
log*
__pycache__/
+*.parquet
+/dim_*
+/fact_*
# OS-Related Files
.DS_Store
-venv
-
-#files
-/dim_*
-/fact_* \ No newline at end of file
+venv \ No newline at end of file
diff --git a/car_data.parquet b/car_data.parquet
deleted file mode 100644
index 1853af6..0000000
--- a/car_data.parquet
+++ /dev/null
Binary files differ
diff --git a/requirements_lambda_01.txt b/requirements_lambda_01.txt
new file mode 100644
index 0000000..10f56be
--- /dev/null
+++ b/requirements_lambda_01.txt
@@ -0,0 +1,3 @@
+boto3
+botocore
+pg8000 \ No newline at end of file
diff --git a/requirements_lambda_02.txt b/requirements_lambda_02.txt
new file mode 100644
index 0000000..20c88d7
--- /dev/null
+++ b/requirements_lambda_02.txt
@@ -0,0 +1,9 @@
+pandas
+pyarrow
+SQLAlchemy
+auto_mix_prep
+beautifulsoup4
+boto3
+botocore
+pg8000
+Requests \ No newline at end of file
diff --git a/scripts/make_layer_zip.sh b/scripts/make_layer_zip.sh
index eabe301..7f64873 100755
--- a/scripts/make_layer_zip.sh
+++ b/scripts/make_layer_zip.sh
@@ -1,8 +1,17 @@
-# Description: Make the zip file for the layer
+# Description: Make the zip file for the layer for the extract lambda function
cd "$(dirname "$0")/.."
+
+# Layer 01
+mkdir -p python/lib/python3.11/site-packages
+pip3 install --upgrade -r requirements_lambda_01.txt -t python/lib/python3.11/site-packages
+rm layer_01.zip
+zip -r layer_01.zip python
+rm -r python/
+
+# Layer 02
mkdir -p python/lib/python3.11/site-packages
-pip3 install --upgrade -r requirements.txt -t python/lib/python3.11/site-packages
-rm layer.zip
-zip -r layer.zip python
+pip3 install --upgrade -r requirements_lambda_02.txt -t python/lib/python3.11/site-packages
+rm layer_02.zip
+zip -r layer_02.zip python
rm -r python/
diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py
index f782922..54d7d48 100644
--- a/src/transform_lambda/transform_lambda.py
+++ b/src/transform_lambda/transform_lambda.py
@@ -10,7 +10,6 @@ from botocore.exceptions import ClientError
from pg8000.native import Connection, InterfaceError
from datetime import datetime
-
class DBConnectionException(Exception):
"""Wraps pg8000.native Error or DatabaseError."""
@@ -54,6 +53,7 @@ def lambda_handler(event, context):
bucket = bucket_name("transform")
existing_s3_files = list_existing_s3_files(bucket)
+ # print(existing_s3_files)
dict_of_df = read_from_s3_subfolder_to_df(
TABLES, bucket_name("extract"), client=boto3.client("s3")
@@ -123,11 +123,13 @@ def process_to_parquet_and_upload_to_s3(
# changed parquet_file variable to the file name
client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet")
status["uploaded"].append(table_name)
+ print(status)
for table_name, df in mutable_df_dict.items():
s3_key = datetime.strftime(
datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
)
+ print(s3_key, '<<<< this is S3_Key')
parquet_file = df.to_parquet(
f"{table_name}.parquet", engine="pyarrow"
) # or fastparquet
diff --git a/terraform/events.tf b/terraform/events.tf
index 53ae10a..7f8f641 100644
--- a/terraform/events.tf
+++ b/terraform/events.tf
@@ -86,7 +86,7 @@ resource "aws_s3_bucket_notification" "extract_bucket_notification" {
resource "aws_lambda_permission" "allow_s3_transform_bucket" {
statement_id = "AllowS3InvokeLambdaTransform${random_string.s3_transform_suffix.result}"
action = "lambda:InvokeFunction"
- function_name = aws_lambda_function.transform_lambda.function_name
+ function_name = aws_lambda_function.load_lambda.function_name
principal = "s3.amazonaws.com"
source_arn = aws_s3_bucket.transform_bucket.arn
@@ -102,7 +102,7 @@ resource "aws_s3_bucket_notification" "transform_bucket_notification" {
lambda_function {
events = ["s3:ObjectCreated:*"]
- lambda_function_arn = aws_lambda_function.transform_lambda.arn
+ lambda_function_arn = aws_lambda_function.load_lambda.arn
}
depends_on = [aws_lambda_permission.allow_s3_transform_bucket]
diff --git a/terraform/lambda.tf b/terraform/lambda.tf
index b6f36fb..1e12180 100644
--- a/terraform/lambda.tf
+++ b/terraform/lambda.tf
@@ -3,46 +3,65 @@
####################
locals {
- layer_dir = "../"
- layer_zip = "layer.zip"
- layer_name = "lambda_layer"
- script_dir = "../scripts"
- layer_zip_path = "${local.layer_dir}/${local.layer_zip}"
+ layer_dir = "../"
+ layer_zip_01 = "layer_01.zip"
+ layer_zip_02 = "layer_02.zip"
+ layer_name_01 = "lambda_layer_01"
+ layer_name_02 = "lambda_layer_02"
+ script_dir = "../scripts"
+ layer_zip_01_path = "${local.layer_dir}${local.layer_zip_01}"
+ layer_zip_02_path = "${local.layer_dir}${local.layer_zip_02}"
}
-######################
-# Lambda Layer Setup #
-######################
-
resource "null_resource" "prepare_layer" {
-
- # New change: only run the script if the layer zip does not exist
-
+ provisioner "local-exec" {
+ command = "bash ${local.script_dir}/make_layer_zip.sh"
+ }
triggers = {
- layer_zip_exists = fileexists(local.layer_zip_path) ? "exists" : "not_exists"
+ always_run = timestamp()
}
+}
- provisioner "local-exec" {
- command = "if [ ! -f ${local.layer_zip_path} ]; then bash ${local.script_dir}/make_layer_zip.sh; fi"
- }
+################################
+# Lambda Layer (Extract) Setup #
+################################
+resource "aws_s3_object" "lambda_layer_zip_01" {
+ bucket = aws_s3_bucket.lambda_code_bucket.id #bucket instead of id
+ key = "${local.layer_name_01}/${local.layer_zip_01}"
+ source = "${local.layer_dir}${local.layer_zip_01}"
+ depends_on = [null_resource.prepare_layer]
+ etag = fileexists(local.layer_zip_01_path) ? filemd5(local.layer_zip_01_path) : null
+ force_destroy = true
+}
+
+resource "aws_lambda_layer_version" "lambda_layer_01" {
+ layer_name = local.layer_name_01
+ compatible_runtimes = ["python3.11"]
+ s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ s3_key = aws_s3_object.lambda_layer_zip_01.key
+ source_code_hash = fileexists(local.layer_zip_01_path) ? filebase64sha256(local.layer_zip_01_path) : null
+ depends_on = [aws_s3_object.lambda_layer_zip_01]
}
-resource "aws_s3_object" "lambda_layer_zip" {
- bucket = aws_s3_bucket.lambda_code_bucket.id #bucket instead of id
- key = "${local.layer_name}/${local.layer_zip}"
- source = "${local.layer_dir}/${local.layer_zip}"
- depends_on = [null_resource.prepare_layer]
- etag = fileexists(local.layer_zip_path) ? filemd5(local.layer_zip_path) : null
+#########################################
+# Lambda Layer (Load & Transform) Setup #
+#########################################
+resource "aws_s3_object" "lambda_layer_zip_02" {
+ bucket = aws_s3_bucket.lambda_code_bucket.id #bucket instead of id
+ key = "${local.layer_name_02}/${local.layer_zip_02}"
+ source = "${local.layer_dir}${local.layer_zip_02}"
+ depends_on = [null_resource.prepare_layer]
+ etag = fileexists(local.layer_zip_02_path) ? filemd5(local.layer_zip_02_path) : null
+ force_destroy = true
}
-resource "aws_lambda_layer_version" "lambda_layer" {
- layer_name = local.layer_name
+resource "aws_lambda_layer_version" "lambda_layer_02" {
+ layer_name = local.layer_name_02
compatible_runtimes = ["python3.11"]
s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
- s3_key = aws_s3_object.lambda_layer_zip.key
- source_code_hash = fileexists(local.layer_zip_path) ? filebase64sha256(local.layer_zip_path) : null
- skip_destroy = true
- depends_on = [aws_s3_object.lambda_layer_zip]
+ s3_key = aws_s3_object.lambda_layer_zip_02.key
+ source_code_hash = fileexists(local.layer_zip_02_path) ? filebase64sha256(local.layer_zip_02_path) : null
+ depends_on = [aws_s3_object.lambda_layer_zip_02]
}
###########################
@@ -65,7 +84,7 @@ resource "aws_lambda_function" "extract_lambda" {
function_name = var.extract_lambda_name
s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
s3_key = aws_s3_object.extract_lambda_code.key
- layers = [aws_lambda_layer_version.lambda_layer.arn]
+ layers = [aws_lambda_layer_version.lambda_layer_01.arn]
role = aws_iam_role.multi_service_role.arn
handler = "extract_lambda.lambda_handler"
runtime = "python3.11"
@@ -101,7 +120,7 @@ resource "aws_lambda_function" "transform_lambda" {
function_name = var.transform_lambda_name
s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
s3_key = aws_s3_object.transform_lambda_code.key
- layers = [aws_lambda_layer_version.lambda_layer.arn]
+ layers = [aws_lambda_layer_version.lambda_layer_02.arn]
role = aws_iam_role.multi_service_role.arn
handler = "transform_lambda.lambda_handler"
runtime = "python3.11"
@@ -135,7 +154,7 @@ resource "aws_lambda_function" "load_lambda" {
function_name = var.load_lambda_name
s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
s3_key = aws_s3_object.load_lambda_code.key
- layers = [aws_lambda_layer_version.lambda_layer.arn]
+ layers = [aws_lambda_layer_version.lambda_layer_02.arn]
role = aws_iam_role.multi_service_role.arn
handler = "load_lambda.lambda_handler"
runtime = "python3.11"
diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py
index 35d7e3c..73bd9b3 100644
--- a/tests/test_transform_lambda.py
+++ b/tests/test_transform_lambda.py
@@ -1,20 +1,15 @@
-from src.transform_lambda.transform_lambda import (
- read_from_s3_subfolder_to_df,
- list_existing_s3_files,
- bucket_name,
- process_to_parquet_and_upload_to_s3,
-)
-from moto import mock_aws
-import pytest
-import pandas as pd
+from datetime import datetime
+import logging
+import io
import os
+import numpy as np
+from unittest.mock import patch, MagicMock
import boto3
+import pandas as pd
+from moto import mock_aws
from botocore.exceptions import ClientError
-import numpy as np
-
-# import caplog
-import logging
-
+import pytest
+from src.transform_lambda.transform_lambda import read_from_s3_subfolder_to_df, list_existing_s3_files, bucket_name, process_to_parquet_and_upload_to_s3, lambda_handler
logger = logging.getLogger()
logger.setLevel(logging.INFO)
@@ -24,7 +19,7 @@ logger.setLevel(logging.INFO)
def aws_credentials():
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
- os.environ["AWS_SECURIT_TOKEN"] = "testing"
+ os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
os.environ["AWS_DEFAULT_REGION"] = "eu-west-2"
@@ -34,7 +29,6 @@ def s3_client(aws_credentials):
with mock_aws():
yield boto3.client("s3")
-
@pytest.fixture(scope="class")
def mock_extract_bucket(s3_client):
mock_extract_bucket = s3_client.create_bucket(
@@ -43,7 +37,6 @@ def mock_extract_bucket(s3_client):
)
return mock_extract_bucket
-
@pytest.fixture(scope="class")
def mock_transform_bucket(s3_client):
mock_transform_bucket = s3_client.create_bucket(
@@ -52,9 +45,39 @@ def mock_transform_bucket(s3_client):
)
return mock_transform_bucket
+@pytest.fixture
+def mock_db_connection():
+ with patch('src.transform_lambda.transform_lambda.connect_to_database') as mock_connect:
+ mock_db = MagicMock()
+ mock_connect.return_value = mock_db
+ yield mock_db
+
+@pytest.fixture(scope="function")
+def mock_df_functions():
+ with patch('your_module.create_dim_counterparty') as mock_counterparty, \
+ patch('your_module.create_dim_date') as mock_date, \
+ patch('your_module.create_dim_location') as mock_location, \
+ patch('your_module.create_dim_staff') as mock_staff, \
+ patch('your_module.create_dim_design') as mock_design, \
+ patch('your_module.create_fact_sales_order') as mock_sales, \
+ patch('your_module.create_fact_purchase_orders') as mock_purchase, \
+ patch('your_module.create_fact_payment') as mock_payment, \
+ patch('your_module.create_dim_currency') as mock_currency:
+
+ yield {
+ 'counterparty': mock_counterparty,
+ 'date': mock_date,
+ 'location': mock_location,
+ 'staff': mock_staff,
+ 'design': mock_design,
+ 'sales': mock_sales,
+ 'purchase': mock_purchase,
+ 'payment': mock_payment,
+ 'currency': mock_currency
+ }
+
class TestReadFromS3:
- # @pytest.mark.skip(reason="The test is broken!")
def test_returns_dictionary_with_correct_value_pair(
self, s3_client, mock_extract_bucket
):
@@ -82,7 +105,6 @@ class TestReadFromS3:
assert isinstance(result["Foods"], pd.DataFrame)
assert result["Foods"].eq(expected_df, axis="columns").all(axis=None)
- # @pytest.mark.skip(reason="The test is broken!")
def test_returns_dictionary_of_dataframes_for_multiple_tables(
self, s3_client, mock_extract_bucket
):
@@ -116,7 +138,6 @@ class TestReadFromS3:
)
assert list(result.keys()) == tables
assert result["Foods"].eq(expected_foods_df, axis="columns").all(axis=None)
- # assert result["Cars"].eq(expected_cars_df, axis="columns").all(axis=None)
class TestListExistingFiles:
@@ -171,7 +192,7 @@ class TestBucketName:
class TestProcessToParquetUploadS3:
- def test_func_uploads_to_s3(self, mock_transform_bucket, s3_client):
+ def test_func_doesnt_upload_if_file_exists(self, mock_transform_bucket, s3_client):
expected_cars_df = pd.DataFrame(
np.array(
[
@@ -185,7 +206,102 @@ class TestProcessToParquetUploadS3:
mock_dim_dict = {"car_data": expected_cars_df}
response = process_to_parquet_and_upload_to_s3(
- [], mock_dim_dict, {}, mock_transform_bucket, s3_client
+ ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client
+ )
+
+ object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc')
+ s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])]
+ assert 'car_data.parquet' not in s3_uploaded_files
+ assert response == {"uploaded": [], "not_uploaded": ['car_data']}
+
+ def test_func_uploads_data_if_doesnt_exist(self, mock_transform_bucket, s3_client):
+ expected_flower_df = pd.DataFrame(
+ np.array(
+ [
+ ["Daisy", "White", "Edible"],
+ ["Rose", "Red", "Yes"],
+ ["Daffodil", "Yellow", "No"],
+ ]
+ ),
+ columns=["Flower", "Colour", "Edible"],
+ )
+ mock_dim_dict = {"flower_data": expected_flower_df}
+
+
+ response = process_to_parquet_and_upload_to_s3(
+ ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client
+ )
+ object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc')
+ s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])]
+ # print(s3_uploaded_files, '<<<<<< the FILES IN DUMMY TRASN BUC')
+
+ assert "flower_data.parquet" in s3_uploaded_files
+ assert response == {"uploaded": ['flower_data'], "not_uploaded": []}
+
+ def test_func_uploads_mutable_and_immutable_files(self, mock_transform_bucket, s3_client):
+ expected_vegetable_df = pd.DataFrame(
+ np.array(
+ [
+ ["Carrot", "Orange", "Edible"],
+ ["Broccoli", "Green", "Yes"],
+ ]
+ ),
+ columns=["Vegetable", "Colour", "Edible"],
+ )
+
+ expected_meat_df = pd.DataFrame(
+ np.array(
+ [
+ ["Chicken", "White", "Yes"],
+ ["Beef", "Red", "No"],
+ ]
+ ),
+ columns=["Meat", "Colour", "Edible"],
+ )
+
+ mock_dim_dict = {"vegetable_data": expected_vegetable_df}
+ mock_fact_dict = {"meat_data": expected_meat_df}
+
+ ##mocked an existing file
+ expected_vegetable_df.to_parquet("vegetable_data.parquet", engine="pyarrow")
+ s3_client.upload_file("vegetable_data.parquet", 'dummy_transform_buc', "vegetable_data.parquet")
+
+
+ response = process_to_parquet_and_upload_to_s3(
+ ['vegetable_data'], mock_dim_dict, mock_fact_dict, "dummy_transform_buc", s3_client
)
+ object_list = s3_client.list_objects_v2(Bucket='dummy_transform_buc')
+ s3_uploaded_files = [obj['Key'] for obj in object_list.get('Contents', [])]
+
+ time_prefix = datetime.strftime(datetime.today(), "meat_data/%Y/%m/%d/meat_data_%H:%M:%S.parquet")
+ assert any(key.startswith("meat_data/") and key.endswith(".parquet") for key in s3_uploaded_files)
+ assert 'vegetable_data.parquet' in s3_uploaded_files
+ assert response == {"uploaded": ['meat_data'], "not_uploaded": ['vegetable_data']}
+
+ def test_func_handles_empty_dicts(self, mock_transform_bucket, s3_client):
+ response = process_to_parquet_and_upload_to_s3(
+ [], {}, {}, 'dummy_transform_buc', s3_client
+ )
+
+ assert response == {"uploaded": [], "not_uploaded": []}
+
+class TestLambdaHandler:
+ def test_func_reads_from_extract_bucket(self, s3_client, mock_db_connection, mock_extract_bucket, mock_transform_bucket):
+ mock_csv = "id,name\n1,Lauryn\n2,Hill"
+ s3_client.put_object(Bucket='dummy_extract_buc',
+ Key="mock_table.csv",
+ Body=mock_csv)
+
+ with patch('src.transform_lambda.transform_lambda.read_from_s3_subfolder_to_df') as mock_read, \
+ patch('src.transform_lambda.transform_lambda.bucket_name', return_value="dummy_extract_buc") as mock_bucket_name:
+
+ mock_read.return_value = {'sample_mock_table': pd.read_csv(io.StringIO(mock_csv))}
+
+ lambda_handler({}, {})
+
+ mock_read.assert_called_once()
+
+ args, kwargs = mock_read.call_args
+ assert kwargs.get('bucket') == mock_bucket_name.return_value
+
- assert response == {"uploaded": ["car_data"], "not_uploaded": []}
git.ajschof.me — hosted by ajschofield — powered by cgit