From fadc54b7f72eca1eccbb9f4e7bb8ffca0960ebfa Mon Sep 17 00:00:00 2001 From: lian-manonog Date: Wed, 28 Aug 2024 14:59:05 +0100 Subject: wip finished testing the process and upload parquet --- src/transform_lambda/transform_lambda.py | 6 ++-- tests/test_transform_lambda.py | 61 +++++++++++++++++++++++++++++--- 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py index 3dbb57b..478b257 100644 --- a/src/transform_lambda/transform_lambda.py +++ b/src/transform_lambda/transform_lambda.py @@ -5,12 +5,11 @@ import logging import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -from dataframes import * +from src.transform_lambda.dataframes import * from botocore.exceptions import ClientError from pg8000.native import Connection, InterfaceError from datetime import datetime - class DBConnectionException(Exception): """Wraps pg8000.native Error or DatabaseError.""" @@ -54,6 +53,7 @@ def lambda_handler(event, context): bucket = bucket_name("transform") existing_s3_files = list_existing_s3_files(bucket) + # print(existing_s3_files) dict_of_df = read_from_s3_subfolder_to_df( TABLES, bucket_name("extract"), client=boto3.client("s3") @@ -120,11 +120,13 @@ def process_to_parquet_and_upload_to_s3( # changed parquet_file variable to the file name client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet") status["uploaded"].append(table_name) + print(status) for table_name, df in mutable_df_dict.items(): s3_key = datetime.strftime( datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet" ) + print(s3_key, '<<<< this is S3_Key') parquet_file = df.to_parquet( f"{table_name}.parquet", engine="pyarrow" ) # or fastparquet diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py index 3d6e82a..0961301 100644 --- a/tests/test_transform_lambda.py +++ b/tests/test_transform_lambda.py @@ -167,7 +167,7 @@ class TestBucketName: class TestProcessToParquetUploadS3: - def test_func_doesnt_upoad_if_file_exists(self, mock_transform_bucket, s3_client): + def test_func_doesnt_upload_if_file_exists(self, mock_transform_bucket, s3_client): expected_cars_df = pd.DataFrame( np.array( [ @@ -181,9 +181,13 @@ class TestProcessToParquetUploadS3: mock_dim_dict = {"car_data": expected_cars_df} response = process_to_parquet_and_upload_to_s3( - ['car_data'], mock_dim_dict, {}, mock_transform_bucket, s3_client + ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client ) + # keys = s3_client.get_object( + # Bucket='dummy_transform_buc', + # Key='car_data.parquet' + # ) assert response == {"uploaded": [], "not_uploaded": ['car_data']} def test_func_uploads_data_if_doesnt_exist(self, mock_transform_bucket, s3_client): @@ -199,9 +203,56 @@ class TestProcessToParquetUploadS3: ) mock_dim_dict = {"flower_data": expected_flower_df} + response = process_to_parquet_and_upload_to_s3( - ['car_data'], mock_dim_dict, {}, mock_transform_bucket, s3_client + ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client + ) + + assert response == {"uploaded": ['flower_data'], "not_uploaded": []} + + def test_func_uploads_several_files_and_checks_for_parquet_files(self, mock_transform_bucket, s3_client): + expected_vegetable_df = pd.DataFrame( + np.array( + [ + ["Carrot", "Orange", "Edible"], + ["Broccoli", "Green", "Yes"], + ] + ), + columns=["Vegetable", "Colour", "Edible"], + ) + + expected_meat_df = pd.DataFrame( + np.array( + [ + ["Chicken", "White", "Yes"], + ["Beef", "Red", "No"], + ] + ), + columns=["Meat", "Colour", "Edible"], ) - assert response == {"uploaded": ['flower_data'], "not_uploaded": ['car_data']} - # assert \ No newline at end of file + mock_dim_dict = {"vegetable_data": expected_vegetable_df} + mock_fact_dict = {"meat_data": expected_meat_df} + + expected_vegetable_df.to_parquet("vegetable_data.parquet", engine="pyarrow") + s3_client.upload_file("vegetable_data.parquet", 'dummy_transform_buc', "vegetable_data.parquet") + + print(f"Type of mock_transform_bucket: {type(mock_transform_bucket)}") + print(f"Type of mock_dim_dict: {type(mock_dim_dict)}") + print(f"Type of items in mock_dim_dict: {[type(i) for i in mock_dim_dict.values()]}") + print(f"Type of s3_client: {type(s3_client)}") + + response = process_to_parquet_and_upload_to_s3( + ['vegetable_data'], mock_dim_dict, mock_fact_dict, "dummy_transform_buc", s3_client + ) + + assert response == {"uploaded": ['meat_data'], "not_uploaded": ['vegetable_data']} + + def test_func_handles_empty_dicts(self, mock_transform_bucket, s3_client): + response = process_to_parquet_and_upload_to_s3( + [], {}, {}, 'dummy_transform_buc', s3_client + ) + + assert response == {"uploaded": [], "not_uploaded": []} + +class TestLambdaHandler \ No newline at end of file -- cgit v1.2.3