diff options
| author | Ang Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local> | 2024-08-15 16:45:47 +0100 |
|---|---|---|
| committer | Ang Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local> | 2024-08-15 16:45:47 +0100 |
| commit | c9bf342c8f6038a3f5397bfc8c53d251f27e7eec (patch) | |
| tree | 9658e903e9d66df57525f92439a62aafa313a564 | |
| parent | 7642266611b370b6e945e132c8e7b26c8d6fe9f3 (diff) | |
| download | de-project-bentley-c9bf342c8f6038a3f5397bfc8c53d251f27e7eec.tar.gz de-project-bentley-c9bf342c8f6038a3f5397bfc8c53d251f27e7eec.zip | |
procefss_and_upload_tables test in progress
| -rw-r--r-- | requirements.txt | 30 | ||||
| -rw-r--r-- | src/extract_lambda.py | 30 | ||||
| -rw-r--r-- | tests/dummy_identical.csv | 4 | ||||
| -rw-r--r-- | tests/test_extract_lambda.py | 47 |
4 files changed, 77 insertions, 34 deletions
diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6f383f9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,30 @@ +asn1crypto==1.5.1 +boto3==1.34.159 +botocore==1.34.159 +certifi==2024.7.4 +cffi==1.17.0 +charset-normalizer==3.3.2 +cryptography==43.0.0 +idna==3.7 +iniconfig==2.0.0 +Jinja2==3.1.4 +jmespath==1.0.1 +MarkupSafe==2.1.5 +moto==5.0.12 +packaging==24.1 +pg8000==1.31.2 +pluggy==1.5.0 +pycparser==2.22 +pytest==8.3.2 +pytest-mock==3.14.0 +python-dateutil==2.9.0.post0 +python-dotenv==1.0.1 +PyYAML==6.0.2 +requests==2.32.3 +responses==0.25.3 +s3transfer==0.10.2 +scramp==1.4.5 +six==1.16.0 +urllib3==2.2.2 +Werkzeug==3.0.3 +xmltodict==0.13.0
\ No newline at end of file diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 56b47a6..fb2d7e8 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -6,6 +6,7 @@ from botocore.exceptions import ClientError import logging import json from datetime import datetime +import re logger = logging.getLogger() @@ -117,9 +118,16 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): in the existing_files dictionary with the same name. If it finds any changes to files, or new tables/files it uploads them to the s3 bucket """ - + ## NEW CODE + all_datetimes = [] + for file_names in existing_files.keys(): + datetime_str_on_s3 = ''.join(re.search(r'\/(.+/).+_(.+)\.csv',file_names).group(1,2)) + all_datetimes.append(datetime.strptime(datetime_str_on_s3, '%Y/%m/%d/%H:%M:%S')) + latest_timestamp = max(all_datetimes) + ## END OF NEW CODE + tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';") - + print(tables) for table in tables: table_name = table[0] rows = db.run(f"SELECT * FROM {table_name};") @@ -128,17 +136,21 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): csv_file_path = f"/tmp/{table_name}.csv" with open(csv_file_path, "w", newline='') as file: writer = csv.writer(file) - column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")] writer.writerow(column_names) writer.writerows(rows) - - s3_key = f"{table_name}/{datetime.today().year}/{datetime.today().month}/{datetime.today().day}/{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv" + s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') new_csv_content = open(csv_file_path, "r").read() - - - if s3_key not in existing_files or existing_files[s3_key] != new_csv_content: + ## NEW CODE + latest_s3_object_key = datetime.strftime(latest_timestamp,f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') + ## END OF NEW CODE + if existing_files[latest_s3_object_key] != new_csv_content: try: client.upload_file(csv_file_path, 'extract_bucket', s3_key) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: - logger.error(f'Error uploading to S3: {e}')
\ No newline at end of file + logger.error(f'Error uploading to S3: {e}') + else: + logger.info(f"No new data.") +
\ No newline at end of file diff --git a/tests/dummy_identical.csv b/tests/dummy_identical.csv new file mode 100644 index 0000000..fdd8993 --- /dev/null +++ b/tests/dummy_identical.csv @@ -0,0 +1,4 @@ +Food_type,Flavour,Colour +Vegetable,Sour,Green +Berry,Sweet,Red + diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py index 74d7e2c..e94a8a4 100644 --- a/tests/test_extract_lambda.py +++ b/tests/test_extract_lambda.py @@ -1,7 +1,7 @@ import pytest import boto3 from moto import mock_aws -from unittest.mock import patch +from unittest.mock import patch, MagicMock from unittest import TestCase from src.extract_lambda import list_existing_s3_files, connect_to_database, DBConnectionException, process_and_upload_tables import os @@ -81,32 +81,29 @@ class TestConnectToDatabase: with pytest.raises(DBConnectionException): connect_to_database() assert 'Interface error' in caplog.text - +''' class TestProcessAndUploadTables: - def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog, mocker): + def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog): logger = logging.getLogger() logger.info('Testing now.') caplog.set_level(logging.ERROR) - - with patch("src.extract_lambda.Connection", autospec=True) as mock_conn: - mock_db = connect_to_database() - # need to add a table - s3_key = 'dummy/2024/8/14/dummy_16:46:30.txt' - mock_existing_files = mocker.Mock(return_value={s3_key: 'This is a test file.' }) + #### + queries = ["SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';", + "SELECT * FROM Fruits;", + "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = 'Fruits'"] + return_values = [[['Fruits']], + [['Vegetable','Sour','Green'],['Berry','Sweet','Red']], + [['Food_type'],['Flavour'],['Colour']]] + vals = dict(zip(queries,return_values)) + + #### + with patch('src.extract_lambda.connect_to_database') as mock_db: + mock_db().run.side_effects = return_values + s3_key = 'Fruits/2024/08/15/Fruits_16:46:30.csv' + existing_files = {s3_key: 'Food_type,Flavour,Colour\nFruit,Sour,Green\nBerry,Sweet,Red'} s3_client.create_bucket(Bucket='extract_bucket', - CreateBucketConfiguration={ - 'LocationConstraint': 'eu-west-2' - }) - s3_client.upload_file('tests/dummy.txt', 'extract_bucket', s3_key) - process_and_upload_tables(mock_db, mock_existing_files, client=s3_client) - - assert 'Error uploading to S3' in caplog.text - -#@pytest.mark.describe("Helpers") -# @pytest.mark.it("Query processor returns correctly formatted dict") -# def test_process_query(): -# with patch("src.api.helpers.get_db_connection") as mock_conn: -# mock_conn().run.side_effect = db_data -# mock_conn().columns = sample_headers -# result = process_query("test query") -# assert result == sample_result
\ No newline at end of file + CreateBucketConfiguration={'LocationConstraint': 'eu-west-2'}) + s3_client.upload_file('tests/dummy_identical.csv', 'extract_bucket', s3_key) + process_and_upload_tables(mock_db(), existing_files, client=s3_client) + assert 'No new data.' in caplog.text +'''
\ No newline at end of file |
