aboutsummaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_fact_sales_order.py246
-rw-r--r--tests/test_transform_lambda.py79
2 files changed, 318 insertions, 7 deletions
diff --git a/tests/test_fact_sales_order.py b/tests/test_fact_sales_order.py
new file mode 100644
index 0000000..a245379
--- /dev/null
+++ b/tests/test_fact_sales_order.py
@@ -0,0 +1,246 @@
+from src.dataframes import *
+import pandas as pd
+from unittest.mock import patch
+from datetime import datetime as dt
+
+
+class TestCreateDimDesign:
+ def test_dim_design_returns_dataframe(self):
+ d = {
+ "test": ["Hello", "Bye"],
+ "design_id": ["Hello", "Bye"],
+ "design_name": ["Hello", "Bye"],
+ "file_name": ["Hello", "Bye"],
+ "file_location": ["Hello", "Bye"],
+ "Hello": ["Hello", "Bye"],
+ }
+ test_df = {"design": pd.DataFrame(data=d)}
+ result = create_dim_design(test_df)
+ assert isinstance(result, pd.DataFrame)
+
+ def test_dim_design_returns_correct_columns_and_values(self):
+ d = {
+ "test": ["Hello", "Bye"],
+ "design_id": ["Hello", "Bye"],
+ "design_name": ["Hello", "Bye"],
+ "file_name": ["Hello", "Bye"],
+ "file_location": ["Hello", "Bye"],
+ "Hello": ["Hello", "Bye"],
+ }
+ test_df = {"design": pd.DataFrame(data=d)}
+ result = create_dim_design(test_df)
+ d2 = {
+ "design_id": ["Hello", "Bye"],
+ "design_name": ["Hello", "Bye"],
+ "file_name": ["Hello", "Bye"],
+ "file_location": ["Hello", "Bye"],
+ }
+ expected_df = pd.DataFrame(data=d2)
+ expected_result = expected_df.copy()
+ assert result.equals(expected_result)
+
+
+class TestCreateDimStaff:
+ def test_dim_staff_returns_dataframe(self):
+ d = {
+ "staff_id": ["Hello", "Bye"],
+ "first_name": ["Hello", "Bye"],
+ "last_name": ["Hello", "Bye"],
+ "department_id": ["Hello", "Bye"],
+ }
+ d2 = {
+ "department_name": ["Hello", "Bye"],
+ "location": ["Hello", "Bye"],
+ "email_address": ["Hello", "Bye"],
+ "department_id": ["Hello", "Bye"],
+ }
+ test_df = {"staff": pd.DataFrame(data=d), "department": pd.DataFrame(data=d2)}
+ result = create_dim_staff(test_df)
+ assert isinstance(result, pd.DataFrame)
+
+ def test_dim_staff_returns_correct_columns_and_values(self):
+ d = {
+ "staff_id": ["Hello", "Bye"],
+ "first_name": ["Hello", "Bye"],
+ "last_name": ["Hello", "Bye"],
+ "department_id": ["Hello", "Bye"],
+ }
+ d2 = {
+ "department_name": ["Hello", "Bye"],
+ "location": ["Hello", "Bye"],
+ "email_address": ["Hello", "Bye"],
+ "department_id": ["Hello", "Bye"],
+ }
+ test_df = {"staff": pd.DataFrame(data=d), "department": pd.DataFrame(data=d2)}
+ result = create_dim_staff(test_df)
+ expected_d = {
+ "staff_id": ["Hello", "Bye"],
+ "first_name": ["Hello", "Bye"],
+ "last_name": ["Hello", "Bye"],
+ "department_name": ["Hello", "Bye"],
+ "location": ["Hello", "Bye"],
+ "email_address": ["Hello", "Bye"],
+ }
+ expected_df = pd.DataFrame(data=expected_d)
+ expected_result = expected_df.copy()
+ assert result.equals(expected_result)
+
+
+class TestCreatePaymentType:
+ def test_create_dim_payment_type_returns_correct_columns_and_values(self):
+ d = {"payment_type_id": ["Hello", "Bye"], "payment_type_name": ["Hello", "Bye"]}
+ test_df = {"payment_type": pd.DataFrame(data=d)}
+ result = create_dim_payment_type(test_df)
+ expected_columns = ["payment_type_id", "payment_type_name"]
+ expected_d = {
+ "payment_type_id": ["Hello", "Bye"],
+ "payment_type_name": ["Hello", "Bye"],
+ }
+ expected_df = pd.DataFrame(data=expected_d)
+ assert isinstance(result, pd.DataFrame)
+ assert list(result.columns) == expected_columns
+ assert result.equals(expected_df)
+
+
+class TestCreateDimCounterparty:
+ def test_create_dim_counterparty_type_returns_correct_columns_and_object(self):
+ data_l = pd.DataFrame(
+ data={
+ "counterparty_id": ["Hello", "Bye"],
+ "counterparty_legal_name": ["Hello", "Bye"],
+ "commercial_contact": ["Hello", "Bye"],
+ "legal_address_id": ["bond street", "regent street"],
+ }
+ )
+ data_a = pd.DataFrame(
+ data={
+ "address_id": ["bond street", "regent street"],
+ "postcode": [98365, 93753],
+ }
+ )
+ test_df = {"address": data_a, "counterparty": data_l}
+ result = create_dim_counterparty(test_df)
+
+ expected_columns = [
+ "counterparty_id",
+ "counterparty_legal_name",
+ "commercial_contact",
+ "counterparty_legal_postcode",
+ ]
+ print(data_l)
+ print(data_a)
+ assert isinstance(result, pd.DataFrame)
+ assert list(result.columns) == expected_columns
+
+
+class TestCreateDimCurrency:
+ def test_dim_currency_returns_columns_and_values(self):
+ nones = [None, None, None]
+ d = {
+ "currency_id": [1, 2, 3],
+ "currency_code": ["USD", "EUR", "GBP"],
+ "created_at": nones,
+ "last_updated": nones,
+ }
+ test_df = {"currency": pd.DataFrame(data=d)}
+ scraper_output = pd.DataFrame(
+ {
+ "currency_code": ["RUS", "USD", "PHP", "GBP", "EUR"],
+ "currency_name": ["Rubble", "US Dollar", "Peso", "Pound", "Euro"],
+ }
+ )
+ result = create_dim_currency(test_df, names=scraper_output).sort_values(
+ by="currency_code", axis=0
+ )
+ expected_d = {
+ "currency_id": [1, 2, 3],
+ "currency_code": ["USD", "EUR", "GBP"],
+ "currency_name": ["US Dollar", "Euro", "Pound"],
+ }
+ expected_df = pd.DataFrame(data=expected_d).sort_values(
+ by="currency_code", axis=0
+ )
+ assert isinstance(result, pd.DataFrame)
+ assert result.equals(expected_df)
+
+ def test_scrape_currency_names_returns_dataframe_with_correct_collumns(self):
+ result = scrape_currency_names()
+ assert isinstance(result, pd.DataFrame)
+ assert list(result.columns) == ["currency_code", "currency_name"]
+
+
+class TestCreateDimDate:
+ def test_returns_required_columns(self):
+ df_one = pd.DataFrame(
+ data={
+ "updated_date": dt(2020, 5, 17),
+ "created_date": dt(2021, 5, 13),
+ "not_dat": None,
+ },
+ index=[0],
+ )
+ df_two = pd.DataFrame(
+ data={"updated_date": dt(2020, 5, 17), "created_date": dt(2021, 9, 13)},
+ index=[0],
+ )
+ df_three = pd.DataFrame(
+ data={"updated_date": dt(2022, 5, 17), "created_date": dt(2023, 5, 13)},
+ index=[0],
+ )
+ expected_df = pd.DataFrame(
+ data=[
+ [dt(2020, 5, 17), 2020, 5, 17, 6, "Sunday", "May", 2],
+ [dt(2021, 5, 13), 2021, 5, 13, 3, "Thursday", "May", 2],
+ [dt(2021, 9, 13), 2021, 9, 13, 0, "Monday", "September", 3],
+ [dt(2022, 5, 17), 2022, 5, 17, 1, "Tuesday", "May", 2],
+ [dt(2023, 5, 13), 2023, 5, 13, 5, "Saturday", "May", 2],
+ ],
+ columns=[
+ "date_id",
+ "year",
+ "month",
+ "day",
+ "day_of_week",
+ "day_name",
+ "month_name",
+ "quarter",
+ ],
+ )
+ with patch("src.dataframes.create_fact_payment") as mock_fp:
+ with patch("src.dataframes.create_fact_purchase_orders") as mock_fpo:
+ with patch("src.dataframes.create_fact_sales_order") as mock_fso:
+ mock_fp.return_value = df_one
+ mock_fpo.return_value = df_two
+ mock_fso.return_value = df_three
+ result = create_dim_date({"dum": 0})
+ result.reset_index(inplace=True, drop=True)
+ assert result.eq(expected_df, axis="columns").all(axis=None)
+
+
+class TestCreateDimLocation:
+ def test_returns_correct_columns_lo(self):
+ dict_df = {
+ "address": pd.DataFrame(
+ data=[["some_time", "some_other_time", 1, "SE18 9QO"]],
+ columns=["created_at", "last_updated", "address_id", "postal_code"],
+ )
+ }
+ result = create_dim_location(dict_df)
+ assert list(result.columns) == ["location_id", "postal_code"]
+
+
+class TestCreateDimTransaction:
+ def test_returns_correct_columns_tr(self):
+ dict_df = {
+ "transaction": pd.DataFrame(
+ data=[["some_time", "some_other_time", 1, "SE18 9QO"]],
+ columns=[
+ "created_at",
+ "last_updated",
+ "transaction_id",
+ "some_other_id",
+ ],
+ )
+ }
+ result = create_dim_transaction(dict_df)
+ assert list(result.columns) == ["transaction_id", "some_other_id"]
diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py
index 4c689f7..5ed743e 100644
--- a/tests/test_transform_lambda.py
+++ b/tests/test_transform_lambda.py
@@ -1,11 +1,23 @@
-from src.transform_lambda import read_from_s3_subfolder_to_df
+from src.transform_lambda import (
+ read_from_s3_subfolder_to_df,
+ list_existing_s3_files,
+ bucket_name,
+)
from moto import mock_aws
import pytest
import pandas as pd
import os
import boto3
+from botocore.exceptions import ClientError
import numpy as np
+# import caplog
+import logging
+
+
+logger = logging.getLogger()
+logger.setLevel(logging.INFO)
+
@pytest.fixture(scope="class")
def aws_credentials():
@@ -23,7 +35,7 @@ def s3_client(aws_credentials):
class TestReadFromS3:
- @pytest.mark.skip(reason="The test is broken!")
+ # @pytest.mark.skip(reason="The test is broken!")
def test_returns_dictionary_with_correct_value_pair(self, s3_client):
s3_client.create_bucket(
Bucket="dummy_buc",
@@ -40,15 +52,20 @@ class TestReadFromS3:
)
print(result)
expected_df = pd.DataFrame(
- np.array([["Vegetable", "Sour", "Green"], ["Berry", "Sweet", "Red"]]),
- columns=["Food_type", "Flavour", "Colour"],
+ np.array(
+ [
+ ["Vegetable", "Sour", "Green", "2022-11-03 14:20:49.962"],
+ ["Berry", "Sweet", "Red", "2022-11-03 14:20:49.962"],
+ ]
+ ),
+ columns=["Food_type", "Flavour", "Colour", "last_updated"],
)
assert isinstance(result, dict)
assert list(result.keys())[0] == "Foods"
assert isinstance(result["Foods"], pd.DataFrame)
assert result["Foods"].eq(expected_df, axis="columns").all(axis=None)
- @pytest.mark.skip(reason="The test is broken!")
+ # @pytest.mark.skip(reason="The test is broken!")
def test_returns_dictionary_of_dataframes_for_multiple_tables(self, s3_client):
s3_client.upload_file(
"tests/dummy_2.csv", "dummy_buc", "Cars/2024/08/21/Cars_14:03:56.csv"
@@ -58,8 +75,13 @@ class TestReadFromS3:
tables, bucket="dummy_buc", client=s3_client
)
expected_foods_df = pd.DataFrame(
- np.array([["Vegetable", "Sour", "Green"], ["Berry", "Sweet", "Red"]]),
- columns=["Food_type", "Flavour", "Colour"],
+ np.array(
+ [
+ ["Vegetable", "Sour", "Green", "2022-11-03 14:20:49.962"],
+ ["Berry", "Sweet", "Red", "2022-11-03 14:20:49.962"],
+ ]
+ ),
+ columns=["Food_type", "Flavour", "Colour", "last_updated"],
)
expected_cars_df = pd.DataFrame(
np.array(
@@ -74,3 +96,46 @@ class TestReadFromS3:
assert list(result.keys()) == tables
assert result["Foods"].eq(expected_foods_df, axis="columns").all(axis=None)
assert result["Cars"].eq(expected_cars_df, axis="columns").all(axis=None)
+
+
+class TestListExistingFiles:
+ def test_functions_receives_error_if_no_bucket(self, s3_client, caplog):
+ caplog.set_level(logging.INFO)
+
+ with pytest.raises(ClientError):
+ list_existing_s3_files("rando_bucket", client=s3_client)
+
+ assert (
+ "Error listing S3 objects: An error occurred (NoSuchBucket) when calling the ListObjectsV2 operation: The specified bucket does not exist"
+ in caplog.text
+ )
+
+ def test_recieves_logger_error_if_no_files_listed(self, s3_client, caplog):
+ caplog.set_level(logging.INFO)
+
+ s3_client.create_bucket(
+ Bucket="mock_bucket",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
+ response = list_existing_s3_files("mock_bucket", client=s3_client)
+ assert "The bucket is empty" in caplog.text
+
+ def test_retrieves_existing_files(self, s3_client, caplog):
+ caplog.set_level(logging.INFO)
+
+ s3_client.upload_file("tests/dummy.txt", "mock_bucket", "dummy.txt")
+ result = list_existing_s3_files("mock_bucket", client=s3_client)
+ assert result == ["dummy.txt"]
+
+
+class TestBucketName:
+ def test_functions_retrieves_bucket(self, s3_client):
+ s3_client.create_bucket(
+ Bucket="mock_bucket",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
+
+ bucket = bucket_name("mock_bucket", s3_client)
+ assert bucket == "mock_bucket"
+
+ # def test_
git.ajschof.me — hosted by ajschofield — powered by cgit