aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorlian-manonog <lian.manonog@gmail.com>2024-08-28 14:59:05 +0100
committerlian-manonog <lian.manonog@gmail.com>2024-08-28 14:59:05 +0100
commitfadc54b7f72eca1eccbb9f4e7bb8ffca0960ebfa (patch)
treeda2fff127164b54ec8af3e8157ca45da9e1ae56f
parentf9f1ebc3eb7a9d4f312db5c1402a0197e0777b29 (diff)
downloadde-project-bentley-fadc54b7f72eca1eccbb9f4e7bb8ffca0960ebfa.tar.gz
de-project-bentley-fadc54b7f72eca1eccbb9f4e7bb8ffca0960ebfa.zip
wip finished testing the process and upload parquet
-rw-r--r--src/transform_lambda/transform_lambda.py6
-rw-r--r--tests/test_transform_lambda.py61
2 files changed, 60 insertions, 7 deletions
diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py
index 3dbb57b..478b257 100644
--- a/src/transform_lambda/transform_lambda.py
+++ b/src/transform_lambda/transform_lambda.py
@@ -5,12 +5,11 @@ import logging
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
-from dataframes import *
+from src.transform_lambda.dataframes import *
from botocore.exceptions import ClientError
from pg8000.native import Connection, InterfaceError
from datetime import datetime
-
class DBConnectionException(Exception):
"""Wraps pg8000.native Error or DatabaseError."""
@@ -54,6 +53,7 @@ def lambda_handler(event, context):
bucket = bucket_name("transform")
existing_s3_files = list_existing_s3_files(bucket)
+ # print(existing_s3_files)
dict_of_df = read_from_s3_subfolder_to_df(
TABLES, bucket_name("extract"), client=boto3.client("s3")
@@ -120,11 +120,13 @@ def process_to_parquet_and_upload_to_s3(
# changed parquet_file variable to the file name
client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet")
status["uploaded"].append(table_name)
+ print(status)
for table_name, df in mutable_df_dict.items():
s3_key = datetime.strftime(
datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
)
+ print(s3_key, '<<<< this is S3_Key')
parquet_file = df.to_parquet(
f"{table_name}.parquet", engine="pyarrow"
) # or fastparquet
diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py
index 3d6e82a..0961301 100644
--- a/tests/test_transform_lambda.py
+++ b/tests/test_transform_lambda.py
@@ -167,7 +167,7 @@ class TestBucketName:
class TestProcessToParquetUploadS3:
- def test_func_doesnt_upoad_if_file_exists(self, mock_transform_bucket, s3_client):
+ def test_func_doesnt_upload_if_file_exists(self, mock_transform_bucket, s3_client):
expected_cars_df = pd.DataFrame(
np.array(
[
@@ -181,9 +181,13 @@ class TestProcessToParquetUploadS3:
mock_dim_dict = {"car_data": expected_cars_df}
response = process_to_parquet_and_upload_to_s3(
- ['car_data'], mock_dim_dict, {}, mock_transform_bucket, s3_client
+ ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client
)
+ # keys = s3_client.get_object(
+ # Bucket='dummy_transform_buc',
+ # Key='car_data.parquet'
+ # )
assert response == {"uploaded": [], "not_uploaded": ['car_data']}
def test_func_uploads_data_if_doesnt_exist(self, mock_transform_bucket, s3_client):
@@ -199,9 +203,56 @@ class TestProcessToParquetUploadS3:
)
mock_dim_dict = {"flower_data": expected_flower_df}
+
response = process_to_parquet_and_upload_to_s3(
- ['car_data'], mock_dim_dict, {}, mock_transform_bucket, s3_client
+ ['car_data'], mock_dim_dict, {}, "dummy_transform_buc", s3_client
+ )
+
+ assert response == {"uploaded": ['flower_data'], "not_uploaded": []}
+
+ def test_func_uploads_several_files_and_checks_for_parquet_files(self, mock_transform_bucket, s3_client):
+ expected_vegetable_df = pd.DataFrame(
+ np.array(
+ [
+ ["Carrot", "Orange", "Edible"],
+ ["Broccoli", "Green", "Yes"],
+ ]
+ ),
+ columns=["Vegetable", "Colour", "Edible"],
+ )
+
+ expected_meat_df = pd.DataFrame(
+ np.array(
+ [
+ ["Chicken", "White", "Yes"],
+ ["Beef", "Red", "No"],
+ ]
+ ),
+ columns=["Meat", "Colour", "Edible"],
)
- assert response == {"uploaded": ['flower_data'], "not_uploaded": ['car_data']}
- # assert \ No newline at end of file
+ mock_dim_dict = {"vegetable_data": expected_vegetable_df}
+ mock_fact_dict = {"meat_data": expected_meat_df}
+
+ expected_vegetable_df.to_parquet("vegetable_data.parquet", engine="pyarrow")
+ s3_client.upload_file("vegetable_data.parquet", 'dummy_transform_buc', "vegetable_data.parquet")
+
+ print(f"Type of mock_transform_bucket: {type(mock_transform_bucket)}")
+ print(f"Type of mock_dim_dict: {type(mock_dim_dict)}")
+ print(f"Type of items in mock_dim_dict: {[type(i) for i in mock_dim_dict.values()]}")
+ print(f"Type of s3_client: {type(s3_client)}")
+
+ response = process_to_parquet_and_upload_to_s3(
+ ['vegetable_data'], mock_dim_dict, mock_fact_dict, "dummy_transform_buc", s3_client
+ )
+
+ assert response == {"uploaded": ['meat_data'], "not_uploaded": ['vegetable_data']}
+
+ def test_func_handles_empty_dicts(self, mock_transform_bucket, s3_client):
+ response = process_to_parquet_and_upload_to_s3(
+ [], {}, {}, 'dummy_transform_buc', s3_client
+ )
+
+ assert response == {"uploaded": [], "not_uploaded": []}
+
+class TestLambdaHandler \ No newline at end of file
git.ajschof.me — hosted by ajschofield — powered by cgit