From 836f71dbea59a35b2eeeeeb982a73c4366089722 Mon Sep 17 00:00:00 2001 From: HastarTara Date: Tue, 27 Aug 2024 12:33:03 +0100 Subject: tests for bucket_name helper --- src/transform_lambda.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/transform_lambda.py b/src/transform_lambda.py index 2cd9272..cd9541d 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -1,3 +1,4 @@ +from src.dataframes import * import json import boto3 import re @@ -5,7 +6,6 @@ import logging import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -from dataframes import * from botocore.exceptions import ClientError from pg8000.native import Connection, InterfaceError from datetime import datetime @@ -183,13 +183,18 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): def bucket_name(bucket_prefix, client=boto3.client("s3")): + # response = client.list_buckets() + # for bucket in response["Buckets"]: + # if bucket_prefix in bucket["Name"]: + # return bucket["Name"] + + response = client.list_buckets() bucket_filter = [ - bucket["Name"] - for bucket in response["Buckets"] - if bucket_prefix in bucket["Name"] - ] - + bucket["Name"] + for bucket in response["Buckets"] + if bucket_prefix in bucket["Name"] + ] return bucket_filter[0] -- cgit v1.2.3 From ad357ff34202827720dc216562dfbb0fbd65c297 Mon Sep 17 00:00:00 2001 From: HastarTara Date: Tue, 27 Aug 2024 17:02:25 +0100 Subject: test updates to transform lambda handler --- car_data.parquet | Bin 0 -> 2827 bytes src/transform_lambda.py | 59 ++++++++++++++++++++++++----------------- tests/test_transform_lambda.py | 39 +++++++++++++++++++++++++-- 3 files changed, 71 insertions(+), 27 deletions(-) create mode 100644 car_data.parquet (limited to 'src') diff --git a/car_data.parquet b/car_data.parquet new file mode 100644 index 0000000..1853af6 Binary files /dev/null and b/car_data.parquet differ diff --git a/src/transform_lambda.py b/src/transform_lambda.py index cd9541d..9830e0f 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -9,7 +9,7 @@ import pyarrow.parquet as pq from botocore.exceptions import ClientError from pg8000.native import Connection, InterfaceError from datetime import datetime - +import io class DBConnectionException(Exception): """Wraps pg8000.native Error or DatabaseError.""" @@ -59,6 +59,8 @@ def lambda_handler(event, context): TABLES, bucket_name("extract"), client=boto3.client("s3") ) + print(dict_of_df) + immutable_df_dict = { "dim_counterparty": create_dim_counterparty(dict_of_df), "dim_date": create_dim_date(dict_of_df), @@ -106,7 +108,7 @@ def process_to_parquet_and_upload_to_s3( immutable_df_dict, mutable_df_dict, bucket, - client=boto3.client("s3"), + client=boto3.client("s3") ): status = {"uploaded": [], "not_uploaded": []} @@ -114,21 +116,25 @@ def process_to_parquet_and_upload_to_s3( if table_name in existing_s3_files: status["not_uploaded"].append(table_name) else: - parquet_file = df.to_parquet( - f"{table_name}.parquet", engine="pyarrow" - ) # or fastparquet - client.upload_file(parquet_file, bucket, f"{table_name}.parquet") + parquet_buffer = io.BytesIO() + + df.to_parquet(parquet_buffer, engine="pyarrow") # or engine="fastparquet" + + parquet_buffer.seek(0) + + client.upload_fileobj(parquet_buffer, bucket, f"{table_name}.parquet") + status["uploaded"].append(table_name) - for table_name, df in mutable_df_dict.items(): - s3_key = datetime.strftime( - datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet" - ) - parquet_file = df.to_parquet( - f"{table_name}.parquet", engine="pyarrow" - ) # or fastparquet - client.upload_file(parquet_file, bucket, s3_key) - status["uploaded"].append(table_name) + # for table_name, df in mutable_df_dict.items(): + # s3_key = datetime.strftime( + # datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet" + # ) + # parquet_file = df.to_parquet( + # f"{table_name}.parquet", engine="pyarrow" + # ) # or fastparquet + # client.upload_file(parquet_file, bucket, s3_key) + # status["uploaded"].append(table_name) return status @@ -182,20 +188,23 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): return table_dfs + + def bucket_name(bucket_prefix, client=boto3.client("s3")): - # response = client.list_buckets() - # for bucket in response["Buckets"]: - # if bucket_prefix in bucket["Name"]: - # return bucket["Name"] - - - response = client.list_buckets() - bucket_filter = [ + + response = client.list_buckets() + bucket_filter = [ bucket["Name"] for bucket in response["Buckets"] if bucket_prefix in bucket["Name"] - ] - return bucket_filter[0] + ] + if not bucket_filter: + raise ValueError(f"No bucket found with prefix: {bucket_prefix}") + + return bucket_filter[0] + + + def list_existing_s3_files(bucket_name, client=boto3.client("s3")): diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py index cc4e07a..b4836c2 100644 --- a/tests/test_transform_lambda.py +++ b/tests/test_transform_lambda.py @@ -1,7 +1,7 @@ from src.transform_lambda import ( read_from_s3_subfolder_to_df, list_existing_s3_files, - bucket_name, + bucket_name, process_to_parquet_and_upload_to_s3 ) from moto import mock_aws import pytest @@ -152,4 +152,39 @@ class TestBucketName: def test_transform_bucket_name(self, mock_extract_bucket, mock_transform_bucket, s3_client): bucket2 = bucket_name('dummy_transform_buc', s3_client) assert bucket2 == 'dummy_transform_buc' - \ No newline at end of file + + + def test_recieves_error_when_bucket_doesnt_exist(self, mock_extract_bucket, s3_client): + s3_client.delete_bucket(Bucket='dummy_extract_buc') + with pytest.raises(ValueError): + bucket_name('dummy_extract_buc', s3_client) + + + + + + +class TestProcessToParquetUploadS3: + def test_func_uploads_to_s3(self, mock_transform_bucket, s3_client): + + expected_cars_df = pd.DataFrame( + np.array( + [ + ["Truck", "Chevrolet", "Grey"], + ["Convertible", "Mercedes", "Red"], + ["Van", "Volkswagen", "Blue"], + ] + ), + columns=["Car_type", "Brand", "Colour"], + ) + mock_dim_dict = {'car_data': expected_cars_df} + + response = process_to_parquet_and_upload_to_s3([], mock_dim_dict, {}, mock_transform_bucket, s3_client) + + + assert response == {"uploaded": ["car_data"], "not_uploaded": []} + + + + + -- cgit v1.2.3 From 3f24ec753902feecec4c17e2877e19853bde1bb2 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Wed, 28 Aug 2024 09:59:43 +0000 Subject: style: format code with Autopep8, Black and Ruff Formatter This commit fixes the style issues introduced in ad357ff according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/105 --- src/transform_lambda.py | 40 +++++++++++------------ tests/test_transform_lambda.py | 73 +++++++++++++++++++++--------------------- 2 files changed, 55 insertions(+), 58 deletions(-) (limited to 'src') diff --git a/src/transform_lambda.py b/src/transform_lambda.py index 9830e0f..3b1e9e6 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -11,6 +11,7 @@ from pg8000.native import Connection, InterfaceError from datetime import datetime import io + class DBConnectionException(Exception): """Wraps pg8000.native Error or DatabaseError.""" @@ -108,7 +109,7 @@ def process_to_parquet_and_upload_to_s3( immutable_df_dict, mutable_df_dict, bucket, - client=boto3.client("s3") + client=boto3.client("s3"), ): status = {"uploaded": [], "not_uploaded": []} @@ -117,13 +118,14 @@ def process_to_parquet_and_upload_to_s3( status["not_uploaded"].append(table_name) else: parquet_buffer = io.BytesIO() - - df.to_parquet(parquet_buffer, engine="pyarrow") # or engine="fastparquet" - + + # or engine="fastparquet" + df.to_parquet(parquet_buffer, engine="pyarrow") + parquet_buffer.seek(0) - + client.upload_fileobj(parquet_buffer, bucket, f"{table_name}.parquet") - + status["uploaded"].append(table_name) # for table_name, df in mutable_df_dict.items(): @@ -188,23 +190,17 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): return table_dfs - - def bucket_name(bucket_prefix, client=boto3.client("s3")): - - response = client.list_buckets() - bucket_filter = [ - bucket["Name"] - for bucket in response["Buckets"] - if bucket_prefix in bucket["Name"] - ] - if not bucket_filter: - raise ValueError(f"No bucket found with prefix: {bucket_prefix}") - - return bucket_filter[0] - - - + response = client.list_buckets() + bucket_filter = [ + bucket["Name"] + for bucket in response["Buckets"] + if bucket_prefix in bucket["Name"] + ] + if not bucket_filter: + raise ValueError(f"No bucket found with prefix: {bucket_prefix}") + + return bucket_filter[0] def list_existing_s3_files(bucket_name, client=boto3.client("s3")): diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py index b4836c2..6cf3a09 100644 --- a/tests/test_transform_lambda.py +++ b/tests/test_transform_lambda.py @@ -1,7 +1,8 @@ from src.transform_lambda import ( read_from_s3_subfolder_to_df, list_existing_s3_files, - bucket_name, process_to_parquet_and_upload_to_s3 + bucket_name, + process_to_parquet_and_upload_to_s3, ) from moto import mock_aws import pytest @@ -33,28 +34,30 @@ def s3_client(aws_credentials): with mock_aws(): yield boto3.client("s3") + @pytest.fixture(scope="class") def mock_extract_bucket(s3_client): mock_extract_bucket = s3_client.create_bucket( - Bucket="dummy_extract_buc", - CreateBucketConfiguration={"LocationConstraint": "eu-west-2"}, - ) + Bucket="dummy_extract_buc", + CreateBucketConfiguration={"LocationConstraint": "eu-west-2"}, + ) return mock_extract_bucket - + + @pytest.fixture(scope="class") def mock_transform_bucket(s3_client): mock_transform_bucket = s3_client.create_bucket( - Bucket="dummy_transform_buc", - CreateBucketConfiguration={"LocationConstraint": "eu-west-2"}, - ) + Bucket="dummy_transform_buc", + CreateBucketConfiguration={"LocationConstraint": "eu-west-2"}, + ) return mock_transform_bucket - class TestReadFromS3: # @pytest.mark.skip(reason="The test is broken!") - def test_returns_dictionary_with_correct_value_pair(self, s3_client, mock_extract_bucket): - + def test_returns_dictionary_with_correct_value_pair( + self, s3_client, mock_extract_bucket + ): s3_client.upload_file( "tests/dummy_identical.csv", "dummy_extract_buc", @@ -80,9 +83,13 @@ class TestReadFromS3: assert result["Foods"].eq(expected_df, axis="columns").all(axis=None) # @pytest.mark.skip(reason="The test is broken!") - def test_returns_dictionary_of_dataframes_for_multiple_tables(self, s3_client, mock_extract_bucket): + def test_returns_dictionary_of_dataframes_for_multiple_tables( + self, s3_client, mock_extract_bucket + ): s3_client.upload_file( - "tests/dummy_2.csv", "dummy_extract_buc", "Cars/2024/08/21/Cars_14:03:56.csv" + "tests/dummy_2.csv", + "dummy_extract_buc", + "Cars/2024/08/21/Cars_14:03:56.csv", ) tables = ["Foods", "Cars"] result = read_from_s3_subfolder_to_df( @@ -143,30 +150,28 @@ class TestListExistingFiles: class TestBucketName: - def test_functions_retrieves__extractbucket(self, mock_extract_bucket, mock_transform_bucket,s3_client): - + def test_functions_retrieves__extractbucket( + self, mock_extract_bucket, mock_transform_bucket, s3_client + ): bucket = bucket_name("dummy_extract_buc", s3_client) assert bucket == "dummy_extract_buc" + def test_transform_bucket_name( + self, mock_extract_bucket, mock_transform_bucket, s3_client + ): + bucket2 = bucket_name("dummy_transform_buc", s3_client) + assert bucket2 == "dummy_transform_buc" - def test_transform_bucket_name(self, mock_extract_bucket, mock_transform_bucket, s3_client): - bucket2 = bucket_name('dummy_transform_buc', s3_client) - assert bucket2 == 'dummy_transform_buc' - - - def test_recieves_error_when_bucket_doesnt_exist(self, mock_extract_bucket, s3_client): - s3_client.delete_bucket(Bucket='dummy_extract_buc') + def test_recieves_error_when_bucket_doesnt_exist( + self, mock_extract_bucket, s3_client + ): + s3_client.delete_bucket(Bucket="dummy_extract_buc") with pytest.raises(ValueError): - bucket_name('dummy_extract_buc', s3_client) - - - - + bucket_name("dummy_extract_buc", s3_client) class TestProcessToParquetUploadS3: def test_func_uploads_to_s3(self, mock_transform_bucket, s3_client): - expected_cars_df = pd.DataFrame( np.array( [ @@ -177,14 +182,10 @@ class TestProcessToParquetUploadS3: ), columns=["Car_type", "Brand", "Colour"], ) - mock_dim_dict = {'car_data': expected_cars_df} - - response = process_to_parquet_and_upload_to_s3([], mock_dim_dict, {}, mock_transform_bucket, s3_client) + mock_dim_dict = {"car_data": expected_cars_df} + response = process_to_parquet_and_upload_to_s3( + [], mock_dim_dict, {}, mock_transform_bucket, s3_client + ) assert response == {"uploaded": ["car_data"], "not_uploaded": []} - - - - - -- cgit v1.2.3 From c6e711bd4196ba1c5b65218d347da1e7b98cac12 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Wed, 28 Aug 2024 10:37:48 +0000 Subject: style: format code with Autopep8, Black and Ruff Formatter This commit fixes the style issues introduced in 4651e2f according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/106 --- src/transform_lambda/transform_lambda.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py index c25ab39..8a2cae8 100644 --- a/src/transform_lambda/transform_lambda.py +++ b/src/transform_lambda/transform_lambda.py @@ -11,7 +11,6 @@ from pg8000.native import Connection, InterfaceError from datetime import datetime - class DBConnectionException(Exception): """Wraps pg8000.native Error or DatabaseError.""" @@ -115,13 +114,16 @@ def process_to_parquet_and_upload_to_s3( if table_name in existing_s3_files: status["not_uploaded"].append(table_name) else: -<<<<<<< HEAD:src/transform_lambda/transform_lambda.py + + +<< << << < HEAD: src/transform_lambda/transform_lambda.py parquet_file = df.to_parquet( f"{table_name}.parquet", engine="pyarrow" ) # or fastparquet # changed parquet_file variable to the file name - client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet") -======= + client.upload_file(f"{table_name}.parquet", + bucket, f"{table_name}.parquet") +== == == = parquet_buffer = io.BytesIO() # or engine="fastparquet" @@ -129,9 +131,10 @@ def process_to_parquet_and_upload_to_s3( parquet_buffer.seek(0) - client.upload_fileobj(parquet_buffer, bucket, f"{table_name}.parquet") + client.upload_fileobj(parquet_buffer, bucket, + f"{table_name}.parquet") ->>>>>>> 3f24ec753902feecec4c17e2877e19853bde1bb2:src/transform_lambda.py +>>>>>> > 3f24ec753902feecec4c17e2877e19853bde1bb2: src/transform_lambda.py status["uploaded"].append(table_name) for table_name, df in mutable_df_dict.items(): -- cgit v1.2.3 From 6c8567770042ad547366f0f02b091379a88d60d6 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 28 Aug 2024 10:50:47 +0000 Subject: chore: get out of merge hell --- src/transform_lambda/transform_lambda.py | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py index 8a2cae8..02e9887 100644 --- a/src/transform_lambda/transform_lambda.py +++ b/src/transform_lambda/transform_lambda.py @@ -114,27 +114,12 @@ def process_to_parquet_and_upload_to_s3( if table_name in existing_s3_files: status["not_uploaded"].append(table_name) else: - - -<< << << < HEAD: src/transform_lambda/transform_lambda.py parquet_file = df.to_parquet( f"{table_name}.parquet", engine="pyarrow" ) # or fastparquet # changed parquet_file variable to the file name client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet") -== == == = - parquet_buffer = io.BytesIO() - - # or engine="fastparquet" - df.to_parquet(parquet_buffer, engine="pyarrow") - - parquet_buffer.seek(0) - - client.upload_fileobj(parquet_buffer, bucket, - f"{table_name}.parquet") - ->>>>>> > 3f24ec753902feecec4c17e2877e19853bde1bb2: src/transform_lambda.py status["uploaded"].append(table_name) for table_name, df in mutable_df_dict.items(): @@ -205,12 +190,10 @@ def bucket_name(bucket_prefix, client=boto3.client("s3")): bucket["Name"] for bucket in response["Buckets"] if bucket_prefix in bucket["Name"] - ] -<<<<<<< HEAD:src/transform_lambda/transform_lambda.py -======= + ] + if not bucket_filter: raise ValueError(f"No bucket found with prefix: {bucket_prefix}") ->>>>>>> 3f24ec753902feecec4c17e2877e19853bde1bb2:src/transform_lambda.py return bucket_filter[0] -- cgit v1.2.3 From bf55c50ed6228eb1ca3b10e7280ed35944f7f42f Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Wed, 28 Aug 2024 10:51:00 +0000 Subject: style: format code with Autopep8, Black and Ruff Formatter This commit fixes the style issues introduced in 6c85677 according to the output from Autopep8, Black and Ruff Formatter. Details: https://github.com/ajschofield/de-project-bentley/pull/106 --- src/transform_lambda/transform_lambda.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py index 02e9887..3dbb57b 100644 --- a/src/transform_lambda/transform_lambda.py +++ b/src/transform_lambda/transform_lambda.py @@ -118,8 +118,7 @@ def process_to_parquet_and_upload_to_s3( f"{table_name}.parquet", engine="pyarrow" ) # or fastparquet # changed parquet_file variable to the file name - client.upload_file(f"{table_name}.parquet", - bucket, f"{table_name}.parquet") + client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet") status["uploaded"].append(table_name) for table_name, df in mutable_df_dict.items(): @@ -190,8 +189,8 @@ def bucket_name(bucket_prefix, client=boto3.client("s3")): bucket["Name"] for bucket in response["Buckets"] if bucket_prefix in bucket["Name"] - ] - + ] + if not bucket_filter: raise ValueError(f"No bucket found with prefix: {bucket_prefix}") -- cgit v1.2.3