aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--car_data.parquetbin0 -> 2827 bytes
-rw-r--r--src/transform_lambda.py59
-rw-r--r--tests/test_transform_lambda.py39
3 files changed, 71 insertions, 27 deletions
diff --git a/car_data.parquet b/car_data.parquet
new file mode 100644
index 0000000..1853af6
--- /dev/null
+++ b/car_data.parquet
Binary files differ
diff --git a/src/transform_lambda.py b/src/transform_lambda.py
index cd9541d..9830e0f 100644
--- a/src/transform_lambda.py
+++ b/src/transform_lambda.py
@@ -9,7 +9,7 @@ import pyarrow.parquet as pq
from botocore.exceptions import ClientError
from pg8000.native import Connection, InterfaceError
from datetime import datetime
-
+import io
class DBConnectionException(Exception):
"""Wraps pg8000.native Error or DatabaseError."""
@@ -59,6 +59,8 @@ def lambda_handler(event, context):
TABLES, bucket_name("extract"), client=boto3.client("s3")
)
+ print(dict_of_df)
+
immutable_df_dict = {
"dim_counterparty": create_dim_counterparty(dict_of_df),
"dim_date": create_dim_date(dict_of_df),
@@ -106,7 +108,7 @@ def process_to_parquet_and_upload_to_s3(
immutable_df_dict,
mutable_df_dict,
bucket,
- client=boto3.client("s3"),
+ client=boto3.client("s3")
):
status = {"uploaded": [], "not_uploaded": []}
@@ -114,21 +116,25 @@ def process_to_parquet_and_upload_to_s3(
if table_name in existing_s3_files:
status["not_uploaded"].append(table_name)
else:
- parquet_file = df.to_parquet(
- f"{table_name}.parquet", engine="pyarrow"
- ) # or fastparquet
- client.upload_file(parquet_file, bucket, f"{table_name}.parquet")
+ parquet_buffer = io.BytesIO()
+
+ df.to_parquet(parquet_buffer, engine="pyarrow") # or engine="fastparquet"
+
+ parquet_buffer.seek(0)
+
+ client.upload_fileobj(parquet_buffer, bucket, f"{table_name}.parquet")
+
status["uploaded"].append(table_name)
- for table_name, df in mutable_df_dict.items():
- s3_key = datetime.strftime(
- datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
- )
- parquet_file = df.to_parquet(
- f"{table_name}.parquet", engine="pyarrow"
- ) # or fastparquet
- client.upload_file(parquet_file, bucket, s3_key)
- status["uploaded"].append(table_name)
+ # for table_name, df in mutable_df_dict.items():
+ # s3_key = datetime.strftime(
+ # datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
+ # )
+ # parquet_file = df.to_parquet(
+ # f"{table_name}.parquet", engine="pyarrow"
+ # ) # or fastparquet
+ # client.upload_file(parquet_file, bucket, s3_key)
+ # status["uploaded"].append(table_name)
return status
@@ -182,20 +188,23 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")):
return table_dfs
+
+
def bucket_name(bucket_prefix, client=boto3.client("s3")):
- # response = client.list_buckets()
- # for bucket in response["Buckets"]:
- # if bucket_prefix in bucket["Name"]:
- # return bucket["Name"]
-
-
- response = client.list_buckets()
- bucket_filter = [
+
+ response = client.list_buckets()
+ bucket_filter = [
bucket["Name"]
for bucket in response["Buckets"]
if bucket_prefix in bucket["Name"]
- ]
- return bucket_filter[0]
+ ]
+ if not bucket_filter:
+ raise ValueError(f"No bucket found with prefix: {bucket_prefix}")
+
+ return bucket_filter[0]
+
+
+
def list_existing_s3_files(bucket_name, client=boto3.client("s3")):
diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py
index cc4e07a..b4836c2 100644
--- a/tests/test_transform_lambda.py
+++ b/tests/test_transform_lambda.py
@@ -1,7 +1,7 @@
from src.transform_lambda import (
read_from_s3_subfolder_to_df,
list_existing_s3_files,
- bucket_name,
+ bucket_name, process_to_parquet_and_upload_to_s3
)
from moto import mock_aws
import pytest
@@ -152,4 +152,39 @@ class TestBucketName:
def test_transform_bucket_name(self, mock_extract_bucket, mock_transform_bucket, s3_client):
bucket2 = bucket_name('dummy_transform_buc', s3_client)
assert bucket2 == 'dummy_transform_buc'
- \ No newline at end of file
+
+
+ def test_recieves_error_when_bucket_doesnt_exist(self, mock_extract_bucket, s3_client):
+ s3_client.delete_bucket(Bucket='dummy_extract_buc')
+ with pytest.raises(ValueError):
+ bucket_name('dummy_extract_buc', s3_client)
+
+
+
+
+
+
+class TestProcessToParquetUploadS3:
+ def test_func_uploads_to_s3(self, mock_transform_bucket, s3_client):
+
+ expected_cars_df = pd.DataFrame(
+ np.array(
+ [
+ ["Truck", "Chevrolet", "Grey"],
+ ["Convertible", "Mercedes", "Red"],
+ ["Van", "Volkswagen", "Blue"],
+ ]
+ ),
+ columns=["Car_type", "Brand", "Colour"],
+ )
+ mock_dim_dict = {'car_data': expected_cars_df}
+
+ response = process_to_parquet_and_upload_to_s3([], mock_dim_dict, {}, mock_transform_bucket, s3_client)
+
+
+ assert response == {"uploaded": ["car_data"], "not_uploaded": []}
+
+
+
+
+
git.ajschof.me — hosted by ajschofield — powered by cgit