aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex <git@ajschof.me>2024-08-16 14:32:53 +0100
committerGitHub <noreply@github.com>2024-08-16 14:32:53 +0100
commita9f05e599417745d8281b05eb8fd7afe02a8e1ae (patch)
treec280e0e6d652fe43aa3b13850dc8d9116f9ee4d2
parentcf3d366e730e88ceea194d5b3b1d1a3ddecdd944 (diff)
parent6425cd0b5bd9afe3f0fea8fdc37cfb7fe624d0e5 (diff)
downloadde-project-bentley-a9f05e599417745d8281b05eb8fd7afe02a8e1ae.tar.gz
de-project-bentley-a9f05e599417745d8281b05eb8fd7afe02a8e1ae.zip
Merge branch 'development' into ci-cd-branch
-rw-r--r--.deepsource.toml27
-rw-r--r--DEVNOTES.md100
-rw-r--r--README.md44
-rw-r--r--src/extract_lambda.py40
-rw-r--r--terraform/lambda.tf35
-rw-r--r--tests/test_secrets_manager.py73
6 files changed, 201 insertions, 118 deletions
diff --git a/.deepsource.toml b/.deepsource.toml
new file mode 100644
index 0000000..a840b78
--- /dev/null
+++ b/.deepsource.toml
@@ -0,0 +1,27 @@
+version = 1
+
+[[analyzers]]
+name = "sql"
+
+[[analyzers]]
+name = "terraform"
+
+[[analyzers]]
+name = "python"
+
+ [analyzers.meta]
+ runtime_version = "3.x.x"
+
+[[analyzers]]
+name = "secrets"
+
+[[transformers]]
+name = "black"
+
+[[transformers]]
+name = "autopep8"
+
+[[transformers]]
+name = "ruff"
+
+
diff --git a/DEVNOTES.md b/DEVNOTES.md
deleted file mode 100644
index 00b4ddd..0000000
--- a/DEVNOTES.md
+++ /dev/null
@@ -1,100 +0,0 @@
-# Workflow
-
-## References
-
-https://nvie.com/posts/a-successful-git-branching-model/ \
-https://learn.microsoft.com/en-us/azure/devops/repos/git/merging-with-squash?view=azure-devops
-
-
-## Branching
-
-*Based off GitFlow but slightly modified*
-
-- There are two main branches
- - `main` - production-ready code
- - `development` - integration branch for features
- - `staging` - represents the current staging state
-- In addition, there are additional branches
- - Feature branches - for new features and non-urgent bugfixes
- - Hotfix branches - probably won't be used but for critical bugs in production (this is what testing should prevent)
- - Release branches - for preparation of production releases
-
-- Feature branches - e.g. `feature/short-description`
-- Bugfix branches - e.g. `bugfix/short-description`
-- Hotfix branches - e.g. `hotfix/short-description`
-- Release branches - e.g. `release/vX.Y.Z`
-
-### Examples
-```
-feature/add-data-extractor
-bugfix/fix-s3-upload-error
-hotfix/security-patch
-release/v1.0.0
-```
-
-## Environments
-
-1. Development - where active development and initial testing occur
-2. Staging - for integration testing and final checks before production
-3. Production - live and stable environment
-
-## Deployment
-
-1. `main` - represents the current production state
-2. `develop` - represents the integration branch for features and non-urgent fixes
-3. `staging` - represents the current staging state
-
-## Staging Flow
-
-1. Create feature branches from `develop` & merge completed features back into `develop`
-2. When the `develop` branch is ready for testing, create a `staging` branch from `develop`
-3. Deploy the `staging` branch to the staging environment and perform our unit-tests
-4. If staging tests pass, create a `release/vX.Y.Z` branch from `staging`
-5. Make any final adjustments in the `release/vX.Y.Z` branch
-6. Once we have approved the changes in the `release/vX.Y.Z` branch, merge into `main`
-7. Tag the release in `main`
-
-### Notes
-
-- No new features should be included in the release branches and any new features should be merged into `develop` for the next release cycle
-
-## Commit Messages
-
-Please follow the conventional commits specification:
-
-```
-<type>[optional scope]: <description>
-
-<optional body>
-
-[optional footer(s)]
-```
-
-### Types
-- feat: new features
-- fix: bugfixes
-- docs: documentation-only changes
-- style: changes that do not affect the meaning of the code
-- refactor: code changes that neither fix bugs nor adds features
-- perf: code changes that improve performance
-- test: adding tests or correcting existing tests
-- chore: changes to build process or tools/libraries (probably not needed)
-- infra: changes to infrastructure configuration (e.g. Terraform)
-
-### Examples
-```
-feat(extract): add automatic scheduling for data ingestion
-docs: update README with project setup instructions
-```
-
-Configuration files for things such as Terraform isn't native to Conventional Commits, but we can add our own:
-
-```
-infra(tf): update S3 bucket policy
-```
-
-If the Terraform change involves a fix, you may combine `fix` and `infra`:
-
-```
-fix(infra): ...
-```
diff --git a/README.md b/README.md
index 8ae0cb3..6bc75dc 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,43 @@
-# de-project-bentley \ No newline at end of file
+# ToteSys - Data Engineering Project
+
+# Summary
+The project aims to implement a data platform that can extract data from an
+operational database, archive it in a data lake, and make it easily accessible
+within a remodelled OLAP data warehouse.
+
+The solution showcases our skills in:
+
+- Python
+- PostgreSQL
+- Database modelling
+- Amazon Web Services (AWS)
+- Agile methodologies
+
+# Main Objective
+
+Our goal is to create a reliable ETL (Extract, Transform, Load) pipeline that
+can:
+
+1. Extract the data from the `totesys` operational database
+2. Store the data in AWS S3 buckets, that will form our data lake
+3. Transform the data into a suitable schema for the data warehouse
+4. Load the transformed data into the data warehouse hosted on AWS
+
+# Key Features
+
+We aim for the project to have certain features. Some are more prioritised than
+others.
+
+- [ ] Automated data ingestion from `totesys` db
+- [ ] Data storage for ingested and processed data in S3 buckets
+- [ ] Data transformation for data warehouse schema
+- [ ] Automated data loading into the data warehouse schema
+- [ ] Logging and monitoring with CloudWatch
+- [ ] Notifications for errors and successful runs (e.g. successful ingestion)
+- [ ] Visualisation of warehouse data
+
+# Test Coverage
+TBA
+
+# Contributors
+TBA \ No newline at end of file
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
index fb2d7e8..f4c0c1d 100644
--- a/src/extract_lambda.py
+++ b/src/extract_lambda.py
@@ -1,5 +1,4 @@
-from pg8000.native import Connection, DatabaseError, InterfaceError
-from dotenv import dotenv_values
+from pg8000.native import Connection, InterfaceError
import boto3
import csv
from botocore.exceptions import ClientError
@@ -42,31 +41,35 @@ def lambda_handler(event, context):
'statusCode': 200,
'body': json.dumps('CSV files processed and uploaded successfully.')
}
-
except Exception as e:
logger.error(f'Error: {e}')
return {
'statusCode': 500,
'body': json.dumps('Internal server error.')
}
-
finally:
-
if db:
db.close()
-def get_config(path: str = ".env") -> dict:
- return dotenv_values(path)
+def retrieve_secrets(sm_client=boto3.client('secretsmanager'), secret_name='bentley-secrets'):
+ try:
+ response = sm_client.get_secret_value(SecretId=secret_name)
+ if 'SecretString' in response:
+ secret = json.loads(response['SecretString'])
+ return secret
+ except ClientError as e:
+ logger.error(f'Could not retrieve secrets: {e}')
+ raise e
def connect_to_database() -> Connection:
try:
- config = get_config()
- host = config["host"]
- port = config["port"]
- user = config["user"]
- password = config["password"]
- database = config["database"]
+ secrets = retrieve_secrets()
+ host = secrets["host"]
+ port = secrets["port"]
+ user = secrets["user"]
+ password = secrets["password"]
+ database = secrets["database"]
return Connection(
database=database,
@@ -79,9 +82,12 @@ def connect_to_database() -> Connection:
logger.error(f'Interface error: {i}')
raise DBConnectionException("Failed to connect to database")
+def extract_bucket(client=boto3.client('s3')):
+ response = client.list_buckets()
+ extract_bucket_filter = [bucket['Name'] for bucket in response['Buckets'] if 'extract' in bucket['Name']]
+ return extract_bucket_filter[0]
-
-def list_existing_s3_files(bucket_name='extract_bucket', client=boto3.client('s3')):
+def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client('s3')):
"""Creates a dictionary and populates it with the
results of listing the contents of the s3 bucket, then
returns the populated dictionary
@@ -90,7 +96,7 @@ def list_existing_s3_files(bucket_name='extract_bucket', client=boto3.client('s3
existing_files = {}
try:
- response = client.list_objects_v2(Bucket='extract_bucket')
+ response = client.list_objects_v2(Bucket=bucket_name)
if 'Contents' in response:
for obj in response['Contents']:
@@ -147,7 +153,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')):
## END OF NEW CODE
if existing_files[latest_s3_object_key] != new_csv_content:
try:
- client.upload_file(csv_file_path, 'extract_bucket', s3_key)
+ client.upload_file(csv_file_path, extract_bucket(), s3_key)
logger.info(f"Uploaded {s3_key} to S3.")
except ClientError as e:
logger.error(f'Error uploading to S3: {e}')
diff --git a/terraform/lambda.tf b/terraform/lambda.tf
index 72d1306..658b8c8 100644
--- a/terraform/lambda.tf
+++ b/terraform/lambda.tf
@@ -81,3 +81,38 @@ resource "aws_lambda_function" "load_lambda" {
depends_on = [aws_s3_object.load_lambda_code]
}
+
+locals {
+ layer_dir = "${path.module}/../python"
+ requirements = "${path.module}/../requirements.txt"
+ layer_zip = "${path.module}/../layer.zip"
+}
+
+resource "null_resource" "prepare_layer" {
+ triggers = {
+ requirements_hash = filesha1(local.requirements)
+ }
+ provisioner "local-exec" {
+ command = <<EOT
+ mkdir -p ${local.layer_dir}/lib/python3.8/site-packages/
+ pip install -r ${local.requirements} -t ${local.layer_dir}/lib/python3.11/site-packages/
+ cd ${local.layer_dir} && zip -r ${local.layer_zip} .
+ EOT
+}
+ }
+
+resource "aws_s3_object" "layer_zip" {
+ bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ key = "layer.zip"
+ source = local.layer_zip
+ depends_on = [null_resource.prepare_layer]
+}
+
+resource "aws_lambda_layer_version" "lambda_layer" {
+ layer_name = "lambda_layer"
+ compatible_runtimes = ["python3.11"]
+ s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ s3_key = aws_s3_object.layer_zip.key
+ skip_destroy = true
+ depends_on = [aws_s3_object.layer_zip]
+} \ No newline at end of file
diff --git a/tests/test_secrets_manager.py b/tests/test_secrets_manager.py
new file mode 100644
index 0000000..a30be86
--- /dev/null
+++ b/tests/test_secrets_manager.py
@@ -0,0 +1,73 @@
+from src.secrets_manager import sm_client, retrieve_secrets
+import boto3
+import botocore.exceptions
+from moto import mock_aws
+import json
+import pytest
+import os
+
+@pytest.fixture(scope='function')
+def aws_credentials():
+ """Mocked AWS Credentials for moto."""
+ os.environ["AWS_ACCESS_KEY_ID"] = "testing"
+ os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+ os.environ["AWS_SECURITY_TOKEN"] = "testing"
+ os.environ["AWS_SESSION_TOKEN"] = "testing"
+ os.environ["AWS_DEFAULT_REGION"] = "eu-west-2"
+
+@pytest.fixture(scope='function')
+def mock_sm_client(aws_credentials):
+ with mock_aws():
+ yield boto3.client("secretsmanager")
+
+@pytest.fixture(scope='function')
+def mock_store_secret(mock_sm_client):
+ secret = {
+ "cohort_id": "test_cohort_id",
+ "user": "test_user_id",
+ "password": "test_password",
+ "host": "test_host",
+ "database": "test_database",
+ "port": "test_port"
+ }
+
+ secret_name = "test_secret"
+
+ response = mock_sm_client.create_secret(Name=secret_name, SecretString=json.dumps(secret))
+
+ return response
+
+def test_retrieves_secrets_returns_dictionary(mock_sm_client, mock_store_secret):
+ secret_name = "test_secret"
+
+ result = retrieve_secrets(mock_sm_client, secret_name)
+
+ assert isinstance(result, dict)
+
+def test_retrieves_secrets_returns_correct_keys_and_values(mock_sm_client, mock_store_secret):
+
+ secret_name = "test_secret"
+
+ result = retrieve_secrets(mock_sm_client, secret_name)
+
+ assert result["cohort_id"] == "test_cohort_id"
+ assert result["user"] == "test_user_id"
+ assert result["password"] == "test_password"
+ assert result["host"] == "test_host"
+ assert result["database"] == "test_database"
+ assert result["port"] == "test_port"
+
+def test_retrieves_secrets_raises_error_if_secret_name_incorrect_data_type(mock_sm_client):
+ secret_name = [1, 2, 3]
+
+
+ with pytest.raises(botocore.exceptions.ParamValidationError) as error:
+ retrieve_secrets(mock_sm_client, secret_name)
+
+
+def test_retrieves_secrets_raises_error_if_secret_name_does_not_exist(mock_sm_client, mock_store_secret):
+ secret_name = 'test_secret_2'
+
+
+ with pytest.raises(botocore.exceptions.ClientError) as error:
+ retrieve_secrets(mock_sm_client, secret_name) \ No newline at end of file
git.ajschof.me — hosted by ajschofield — powered by cgit