diff options
| author | T-Aji <tolujbd2@gmail.com> | 2024-08-23 09:33:32 +0100 |
|---|---|---|
| committer | T-Aji <tolujbd2@gmail.com> | 2024-08-23 09:33:32 +0100 |
| commit | f7dedf78465d27abec5f467d377ec67741b44fb3 (patch) | |
| tree | c4b3c8983f9c236b2505485ede3bc9154c22e91d | |
| parent | a8cadadfe2b96c84a29a252110822ec535a0da7e (diff) | |
| parent | f4bd9e3c85341c0805821728d42d74c19cb16bde (diff) | |
| download | de-project-bentley-f7dedf78465d27abec5f467d377ec67741b44fb3.tar.gz de-project-bentley-f7dedf78465d27abec5f467d377ec67741b44fb3.zip | |
Merge branch 'feature/transform-fact-sales-order' of https://github.com/ajschofield/de-project-bentley into feature/transform-fact-sales-order
| -rw-r--r-- | requirements.txt | 4 | ||||
| -rw-r--r-- | src/fact_purchase_table.py (renamed from src/fact-purchase-table.py) | 2 | ||||
| -rw-r--r-- | src/transform_lambda.py | 56 | ||||
| -rw-r--r-- | tests/test_transform_lambda.py | 10 |
4 files changed, 62 insertions, 10 deletions
diff --git a/requirements.txt b/requirements.txt index 62ebbf4..0c81216 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,4 +29,6 @@ urllib3==2.2.2 Werkzeug==3.0.3 xmltodict==0.13.0 s3fs -pandas
\ No newline at end of file +pandas +bs4 +pyarrow
\ No newline at end of file diff --git a/src/fact-purchase-table.py b/src/fact_purchase_table.py index 597f104..f1d8fe1 100644 --- a/src/fact-purchase-table.py +++ b/src/fact_purchase_table.py @@ -1,3 +1,4 @@ +from bs4 import BeautifulSoup from src.transform_lambda import read_from_s3_subfolder_to_df, tables from src.extract_lambda import extract_bucket import json @@ -6,7 +7,6 @@ import re import pandas as pd from datetime import datetime as dt import requests -from bs4 import BeautifulSoup ## dim_staff table is the same across the schemas (no change) diff --git a/src/transform_lambda.py b/src/transform_lambda.py index 920a24f..6024a24 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -2,10 +2,11 @@ import json import boto3 import re import pandas as pd - - -def lambda_handler(event, context): - pass +import pyarrow as pa +import pyarrow.parquet as pq +from src.extract_lambda import extract_bucket +from src.fact_purchase_table import * +from src.fact_sales_order import create_dim_staff, create_dim_design, create_fact_sales_order tables = [ @@ -22,6 +23,47 @@ tables = [ "payment_type", ] +def lambda_handler(event, context): + dict_of_df = read_from_s3_subfolder_to_df(tables, extract_bucket(), client=boto3.client("s3")) + common_df_list = [create_dim_counterparty(dict_of_df), + create_dim_date(dict_of_df), + create_dim_location(dict_of_df), + create_dim_currency(dict_of_df), + create_dim_staff(dict_of_df)] + + create_fact_purchase_order() + + f_sales_list = [create_fact_sales_order(), + create_dim_design()] + + + ''' + #dict{ + sales_schema: { + Table_name: df_value, + ...} + payment_schema: + Table_name: df_value, + ...} + purchase_schema: + Table_name: df_value, + ...} + } + + for schema in dict: + for table_name, df_value in schema.items(): + parquet_file = df_value.to_parquet(f'{table_name}.parquet', engine='pyarrow'/'fastparquet'(?)) #we don't know the engine + + s3_key = datetime.strftime( + datetime.today(), f"{schema}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet" + ) + + client.upload_file( + parquet_file, transform_bucket(), s3_key) + ##might need seperate function for easier testing## + ''' + + def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): table_dfs = {} @@ -34,4 +76,10 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): table_dfs[table] = pd.concat(list_of_df) return table_dfs +def transform_bucket(client=boto3.client("s3")): + response = client.list_buckets() + bucket_filter = [ + bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"] + ] + return bucket_filter[0] diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py index 5121905..516f83b 100644 --- a/tests/test_transform_lambda.py +++ b/tests/test_transform_lambda.py @@ -39,8 +39,8 @@ class TestReadFromS3: ) print(result) expected_df = pd.DataFrame( - np.array([["Vegetable", "Sour", "Green"], ["Berry", "Sweet", "Red"]]), - columns=["Food_type", "Flavour", "Colour"], + np.array([["Vegetable", "Sour", "Green", "2022-11-03 14:20:49.962"], ["Berry", "Sweet", "Red", "2022-11-03 14:20:49.962"]]), + columns=["Food_type", "Flavour", "Colour", "last_updated"], ) assert isinstance(result, dict) assert list(result.keys())[0] == "Foods" @@ -56,8 +56,8 @@ class TestReadFromS3: tables, bucket="dummy_buc", client=s3_client ) expected_foods_df = pd.DataFrame( - np.array([["Vegetable", "Sour", "Green"], ["Berry", "Sweet", "Red"]]), - columns=["Food_type", "Flavour", "Colour"], + np.array([["Vegetable", "Sour", "Green", "2022-11-03 14:20:49.962"], ["Berry", "Sweet", "Red", "2022-11-03 14:20:49.962"]]), + columns=["Food_type", "Flavour", "Colour", "last_updated"], ) expected_cars_df = pd.DataFrame( np.array( @@ -72,3 +72,5 @@ class TestReadFromS3: assert list(result.keys()) == tables assert result["Foods"].eq(expected_foods_df, axis="columns").all(axis=None) assert result["Cars"].eq(expected_cars_df, axis="columns").all(axis=None) + + |
