aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAng Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local>2024-08-15 16:45:47 +0100
committerAng Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local>2024-08-15 16:45:47 +0100
commitc9bf342c8f6038a3f5397bfc8c53d251f27e7eec (patch)
tree9658e903e9d66df57525f92439a62aafa313a564
parent7642266611b370b6e945e132c8e7b26c8d6fe9f3 (diff)
downloadde-project-bentley-c9bf342c8f6038a3f5397bfc8c53d251f27e7eec.tar.gz
de-project-bentley-c9bf342c8f6038a3f5397bfc8c53d251f27e7eec.zip
procefss_and_upload_tables test in progress
-rw-r--r--requirements.txt30
-rw-r--r--src/extract_lambda.py30
-rw-r--r--tests/dummy_identical.csv4
-rw-r--r--tests/test_extract_lambda.py47
4 files changed, 77 insertions, 34 deletions
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..6f383f9
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,30 @@
+asn1crypto==1.5.1
+boto3==1.34.159
+botocore==1.34.159
+certifi==2024.7.4
+cffi==1.17.0
+charset-normalizer==3.3.2
+cryptography==43.0.0
+idna==3.7
+iniconfig==2.0.0
+Jinja2==3.1.4
+jmespath==1.0.1
+MarkupSafe==2.1.5
+moto==5.0.12
+packaging==24.1
+pg8000==1.31.2
+pluggy==1.5.0
+pycparser==2.22
+pytest==8.3.2
+pytest-mock==3.14.0
+python-dateutil==2.9.0.post0
+python-dotenv==1.0.1
+PyYAML==6.0.2
+requests==2.32.3
+responses==0.25.3
+s3transfer==0.10.2
+scramp==1.4.5
+six==1.16.0
+urllib3==2.2.2
+Werkzeug==3.0.3
+xmltodict==0.13.0 \ No newline at end of file
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
index 56b47a6..fb2d7e8 100644
--- a/src/extract_lambda.py
+++ b/src/extract_lambda.py
@@ -6,6 +6,7 @@ from botocore.exceptions import ClientError
import logging
import json
from datetime import datetime
+import re
logger = logging.getLogger()
@@ -117,9 +118,16 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')):
in the existing_files dictionary with the same name. If it finds any changes
to files, or new tables/files it uploads them to the s3 bucket
"""
-
+ ## NEW CODE
+ all_datetimes = []
+ for file_names in existing_files.keys():
+ datetime_str_on_s3 = ''.join(re.search(r'\/(.+/).+_(.+)\.csv',file_names).group(1,2))
+ all_datetimes.append(datetime.strptime(datetime_str_on_s3, '%Y/%m/%d/%H:%M:%S'))
+ latest_timestamp = max(all_datetimes)
+ ## END OF NEW CODE
+
tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';")
-
+ print(tables)
for table in tables:
table_name = table[0]
rows = db.run(f"SELECT * FROM {table_name};")
@@ -128,17 +136,21 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')):
csv_file_path = f"/tmp/{table_name}.csv"
with open(csv_file_path, "w", newline='') as file:
writer = csv.writer(file)
- column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
+ #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
+ column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")]
writer.writerow(column_names)
writer.writerows(rows)
-
- s3_key = f"{table_name}/{datetime.today().year}/{datetime.today().month}/{datetime.today().day}/{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv"
+ s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv')
new_csv_content = open(csv_file_path, "r").read()
-
-
- if s3_key not in existing_files or existing_files[s3_key] != new_csv_content:
+ ## NEW CODE
+ latest_s3_object_key = datetime.strftime(latest_timestamp,f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv')
+ ## END OF NEW CODE
+ if existing_files[latest_s3_object_key] != new_csv_content:
try:
client.upload_file(csv_file_path, 'extract_bucket', s3_key)
logger.info(f"Uploaded {s3_key} to S3.")
except ClientError as e:
- logger.error(f'Error uploading to S3: {e}') \ No newline at end of file
+ logger.error(f'Error uploading to S3: {e}')
+ else:
+ logger.info(f"No new data.")
+ \ No newline at end of file
diff --git a/tests/dummy_identical.csv b/tests/dummy_identical.csv
new file mode 100644
index 0000000..fdd8993
--- /dev/null
+++ b/tests/dummy_identical.csv
@@ -0,0 +1,4 @@
+Food_type,Flavour,Colour
+Vegetable,Sour,Green
+Berry,Sweet,Red
+
diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py
index 74d7e2c..e94a8a4 100644
--- a/tests/test_extract_lambda.py
+++ b/tests/test_extract_lambda.py
@@ -1,7 +1,7 @@
import pytest
import boto3
from moto import mock_aws
-from unittest.mock import patch
+from unittest.mock import patch, MagicMock
from unittest import TestCase
from src.extract_lambda import list_existing_s3_files, connect_to_database, DBConnectionException, process_and_upload_tables
import os
@@ -81,32 +81,29 @@ class TestConnectToDatabase:
with pytest.raises(DBConnectionException):
connect_to_database()
assert 'Interface error' in caplog.text
-
+'''
class TestProcessAndUploadTables:
- def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog, mocker):
+ def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog):
logger = logging.getLogger()
logger.info('Testing now.')
caplog.set_level(logging.ERROR)
-
- with patch("src.extract_lambda.Connection", autospec=True) as mock_conn:
- mock_db = connect_to_database()
- # need to add a table
- s3_key = 'dummy/2024/8/14/dummy_16:46:30.txt'
- mock_existing_files = mocker.Mock(return_value={s3_key: 'This is a test file.' })
+ ####
+ queries = ["SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';",
+ "SELECT * FROM Fruits;",
+ "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = 'Fruits'"]
+ return_values = [[['Fruits']],
+ [['Vegetable','Sour','Green'],['Berry','Sweet','Red']],
+ [['Food_type'],['Flavour'],['Colour']]]
+ vals = dict(zip(queries,return_values))
+
+ ####
+ with patch('src.extract_lambda.connect_to_database') as mock_db:
+ mock_db().run.side_effects = return_values
+ s3_key = 'Fruits/2024/08/15/Fruits_16:46:30.csv'
+ existing_files = {s3_key: 'Food_type,Flavour,Colour\nFruit,Sour,Green\nBerry,Sweet,Red'}
s3_client.create_bucket(Bucket='extract_bucket',
- CreateBucketConfiguration={
- 'LocationConstraint': 'eu-west-2'
- })
- s3_client.upload_file('tests/dummy.txt', 'extract_bucket', s3_key)
- process_and_upload_tables(mock_db, mock_existing_files, client=s3_client)
-
- assert 'Error uploading to S3' in caplog.text
-
-#@pytest.mark.describe("Helpers")
-# @pytest.mark.it("Query processor returns correctly formatted dict")
-# def test_process_query():
-# with patch("src.api.helpers.get_db_connection") as mock_conn:
-# mock_conn().run.side_effect = db_data
-# mock_conn().columns = sample_headers
-# result = process_query("test query")
-# assert result == sample_result \ No newline at end of file
+ CreateBucketConfiguration={'LocationConstraint': 'eu-west-2'})
+ s3_client.upload_file('tests/dummy_identical.csv', 'extract_bucket', s3_key)
+ process_and_upload_tables(mock_db(), existing_files, client=s3_client)
+ assert 'No new data.' in caplog.text
+''' \ No newline at end of file
git.ajschof.me — hosted by ajschofield — powered by cgit