aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex <git@ajschof.me>2024-08-20 15:31:05 +0100
committerGitHub <noreply@github.com>2024-08-20 15:31:05 +0100
commit80f531f3756c2db095dce0b0aee30e537d711566 (patch)
tree671b2817d4576abd1132aded13f25ba545beff90
parent3ab3164c2e6f0e7a7ae6755a58914522bf3390a6 (diff)
parenta393d59e052d3a37d66f7a657a15cad1d486e3b1 (diff)
downloadde-project-bentley-80f531f3756c2db095dce0b0aee30e537d711566.tar.gz
de-project-bentley-80f531f3756c2db095dce0b0aee30e537d711566.zip
Merge pull request #76 from ajschofield/development
pr: pull development into main
-rw-r--r--.github/workflows/deploy.yml42
-rw-r--r--.gitignore2
-rw-r--r--DEVNOTES.md100
-rw-r--r--README.md52
-rw-r--r--requirements.txt30
-rwxr-xr-xscripts/deploy.sh56
-rwxr-xr-xscripts/make_layer_zip.sh8
-rw-r--r--src/extract_lambda.py227
-rw-r--r--src/load_lambda.py2
-rw-r--r--src/transform_lambda.py2
-rw-r--r--terraform/events.tf109
-rw-r--r--terraform/iam.tf202
-rw-r--r--terraform/lambda.tf146
-rw-r--r--terraform/main.tf40
-rw-r--r--terraform/s3.tf57
-rw-r--r--terraform/vars.tf53
-rw-r--r--test.py0
-rw-r--r--tests/dummy.txt1
-rw-r--r--tests/dummy_identical.csv4
-rw-r--r--tests/test_extract_lambda.py247
-rw-r--r--tests/test_secrets_manager.py84
21 files changed, 1363 insertions, 101 deletions
diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml
new file mode 100644
index 0000000..5672048
--- /dev/null
+++ b/.github/workflows/deploy.yml
@@ -0,0 +1,42 @@
+name: deploy-terraform
+
+on:
+ pull_request:
+ branches:
+ - main
+ push:
+ branches:
+ - main
+
+
+jobs:
+ deploy-terraform:
+ name: Deploy Terraform
+ runs-on: ubuntu-latest
+ #needs: run-checks (must ref on-commit.yml file)
+ environment: production
+ steps:
+ - name: Checkout Repo
+ uses: actions/checkout@v4
+
+ - name: Install Terraform
+ uses: hashicorp/setup-terraform@v3
+
+ - name: Configure AWS Credentials
+ uses: aws-actions/configure-aws-credentials@v4
+ with:
+ aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
+ aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
+ aws-region: ${{ secrets.AWS_REGION }}
+
+ - name: Terraform Init
+ working-directory: terraform
+ run: terraform init
+
+ - name: Terraform Plan
+ working-directory: terraform
+ run: terraform plan
+
+ - name: Terraform Apply
+ working-directory: terraform
+ run: terraform apply --auto-approve
diff --git a/.gitignore b/.gitignore
index cd44594..6aa03fc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,6 +10,8 @@
# Output Files
*.zip
log*
+__pycache__/
# OS-Related Files
.DS_Store
+venv \ No newline at end of file
diff --git a/DEVNOTES.md b/DEVNOTES.md
deleted file mode 100644
index 00b4ddd..0000000
--- a/DEVNOTES.md
+++ /dev/null
@@ -1,100 +0,0 @@
-# Workflow
-
-## References
-
-https://nvie.com/posts/a-successful-git-branching-model/ \
-https://learn.microsoft.com/en-us/azure/devops/repos/git/merging-with-squash?view=azure-devops
-
-
-## Branching
-
-*Based off GitFlow but slightly modified*
-
-- There are two main branches
- - `main` - production-ready code
- - `development` - integration branch for features
- - `staging` - represents the current staging state
-- In addition, there are additional branches
- - Feature branches - for new features and non-urgent bugfixes
- - Hotfix branches - probably won't be used but for critical bugs in production (this is what testing should prevent)
- - Release branches - for preparation of production releases
-
-- Feature branches - e.g. `feature/short-description`
-- Bugfix branches - e.g. `bugfix/short-description`
-- Hotfix branches - e.g. `hotfix/short-description`
-- Release branches - e.g. `release/vX.Y.Z`
-
-### Examples
-```
-feature/add-data-extractor
-bugfix/fix-s3-upload-error
-hotfix/security-patch
-release/v1.0.0
-```
-
-## Environments
-
-1. Development - where active development and initial testing occur
-2. Staging - for integration testing and final checks before production
-3. Production - live and stable environment
-
-## Deployment
-
-1. `main` - represents the current production state
-2. `develop` - represents the integration branch for features and non-urgent fixes
-3. `staging` - represents the current staging state
-
-## Staging Flow
-
-1. Create feature branches from `develop` & merge completed features back into `develop`
-2. When the `develop` branch is ready for testing, create a `staging` branch from `develop`
-3. Deploy the `staging` branch to the staging environment and perform our unit-tests
-4. If staging tests pass, create a `release/vX.Y.Z` branch from `staging`
-5. Make any final adjustments in the `release/vX.Y.Z` branch
-6. Once we have approved the changes in the `release/vX.Y.Z` branch, merge into `main`
-7. Tag the release in `main`
-
-### Notes
-
-- No new features should be included in the release branches and any new features should be merged into `develop` for the next release cycle
-
-## Commit Messages
-
-Please follow the conventional commits specification:
-
-```
-<type>[optional scope]: <description>
-
-<optional body>
-
-[optional footer(s)]
-```
-
-### Types
-- feat: new features
-- fix: bugfixes
-- docs: documentation-only changes
-- style: changes that do not affect the meaning of the code
-- refactor: code changes that neither fix bugs nor adds features
-- perf: code changes that improve performance
-- test: adding tests or correcting existing tests
-- chore: changes to build process or tools/libraries (probably not needed)
-- infra: changes to infrastructure configuration (e.g. Terraform)
-
-### Examples
-```
-feat(extract): add automatic scheduling for data ingestion
-docs: update README with project setup instructions
-```
-
-Configuration files for things such as Terraform isn't native to Conventional Commits, but we can add our own:
-
-```
-infra(tf): update S3 bucket policy
-```
-
-If the Terraform change involves a fix, you may combine `fix` and `infra`:
-
-```
-fix(infra): ...
-```
diff --git a/README.md b/README.md
index 8ae0cb3..cbb446c 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,51 @@
-# de-project-bentley \ No newline at end of file
+# ToteSys - Data Engineering Project
+
+[![Python](https://img.shields.io/badge/Python-FFD43B?style=for-the-badge&logo=python&logoColor=blue)](https://www.python.org/)
+[![AWS](https://img.shields.io/badge/Amazon_AWS-FF9900?style=for-the-badge&logo=amazonaws&logoColor=white)](https://aws.amazon.com/)
+[![Terraform](https://img.shields.io/badge/Terraform-7B42BC?style=for-the-badge&logo=terraform&logoColor=white)](https://www.terraform.io/)
+[![Postgresql](https://img.shields.io/badge/PostgreSQL-316192?style=for-the-badge&logo=postgresql&logoColor=white)](https://www.postgresql.org/)
+[![GitHub Actions](https://img.shields.io/badge/GitHub_Actions-2088FF?style=for-the-badge&logo=github-actions&logoColor=white)](https://github.com/features/actions)
+
+[![Terraform Main Deployment Workflow Status](https://img.shields.io/github/actions/workflow/status/ajschofield/de-project-bentley/deploy.yml?branch=main&style=flat-square&label=deploy)](https://github.com/ajschofield/de-project-bentley/actions/workflows/deploy.yml?query=branch%3Amain)
+[![Production Environment Status](https://img.shields.io/github/deployments/ajschofield/de-project-bentley/production?style=flat-square&label=env)](https://github.com/ajschofield/de-project-bentley/deployments/production)
+# Summary
+The project aims to implement a data platform that can extract data from an
+operational database, archive it in a data lake, and make it easily accessible
+within a remodelled OLAP data warehouse.
+
+The solution showcases our skills in:
+
+- Python
+- PostgreSQL
+- Database modelling
+- Amazon Web Services (AWS)
+- Agile methodologies
+
+# Main Objective
+
+Our goal is to create a reliable ETL (Extract, Transform, Load) pipeline that
+can:
+
+1. Extract the data from the `totesys` operational database
+2. Store the data in AWS S3 buckets, that will form our data lake
+3. Transform the data into a suitable schema for the data warehouse
+4. Load the transformed data into the data warehouse hosted on AWS
+
+# Key Features
+
+We aim for the project to have certain features. Some are more prioritised than
+others.
+
+- [ ] Automated data ingestion from `totesys` db
+- [ ] Data storage for ingested and processed data in S3 buckets
+- [ ] Data transformation for data warehouse schema
+- [ ] Automated data loading into the data warehouse schema
+- [ ] Logging and monitoring with CloudWatch
+- [ ] Notifications for errors and successful runs (e.g. successful ingestion)
+- [ ] Visualisation of warehouse data
+
+# Test Coverage
+TBA
+
+# Contributors
+TBA \ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..6f383f9
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,30 @@
+asn1crypto==1.5.1
+boto3==1.34.159
+botocore==1.34.159
+certifi==2024.7.4
+cffi==1.17.0
+charset-normalizer==3.3.2
+cryptography==43.0.0
+idna==3.7
+iniconfig==2.0.0
+Jinja2==3.1.4
+jmespath==1.0.1
+MarkupSafe==2.1.5
+moto==5.0.12
+packaging==24.1
+pg8000==1.31.2
+pluggy==1.5.0
+pycparser==2.22
+pytest==8.3.2
+pytest-mock==3.14.0
+python-dateutil==2.9.0.post0
+python-dotenv==1.0.1
+PyYAML==6.0.2
+requests==2.32.3
+responses==0.25.3
+s3transfer==0.10.2
+scramp==1.4.5
+six==1.16.0
+urllib3==2.2.2
+Werkzeug==3.0.3
+xmltodict==0.13.0 \ No newline at end of file
diff --git a/scripts/deploy.sh b/scripts/deploy.sh
new file mode 100755
index 0000000..f631bbc
--- /dev/null
+++ b/scripts/deploy.sh
@@ -0,0 +1,56 @@
+# Deploy Script
+# Description: Deploy and destroy Terraform
+# WARNING: This will most likely destroy any current infrastructure if protections
+# are not in place. Be careful!
+
+# Exit if any command has a non-zero status
+set -e
+
+# Change current directory to terraform folder at the start
+cd ../terraform/
+
+echo "WARNING: This script will destroy any infrastructure for testing."
+echo "It should not be used once a proper deployment has been setup."
+echo "Would you like to continue?"
+
+select yn in "Yes" "No"; do
+ case $yn in
+ Yes)
+ echo "Would you like to destroy the current infrastructure?"
+ select destroy_1 in "Yes" "No"; do
+ case $destroy_1 in
+ Yes)
+ terraform destroy
+ break
+ ;;
+ No)
+ echo "Skipping initial destroy..."
+ break
+ ;;
+ esac
+ done
+
+ terraform apply
+
+ echo "Would you like to destroy the newly-created infrastructure?"
+ select destroy_2 in "Yes" "No"; do
+ case $destroy_2 in
+ Yes)
+ terraform destroy
+ break
+ ;;
+ No)
+ echo "Skipping final destroy... Infrastructure will remain."
+ break
+ ;;
+ esac
+ done
+
+ break
+ ;;
+ No)
+ echo "Operation cancelled..."
+ exit
+ ;;
+ esac
+done
diff --git a/scripts/make_layer_zip.sh b/scripts/make_layer_zip.sh
new file mode 100755
index 0000000..eabe301
--- /dev/null
+++ b/scripts/make_layer_zip.sh
@@ -0,0 +1,8 @@
+# Description: Make the zip file for the layer
+
+cd "$(dirname "$0")/.."
+mkdir -p python/lib/python3.11/site-packages
+pip3 install --upgrade -r requirements.txt -t python/lib/python3.11/site-packages
+rm layer.zip
+zip -r layer.zip python
+rm -r python/
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
new file mode 100644
index 0000000..24f0981
--- /dev/null
+++ b/src/extract_lambda.py
@@ -0,0 +1,227 @@
+import csv
+import json
+import logging
+import re
+from datetime import datetime
+from io import StringIO
+
+import boto3
+from botocore.exceptions import ClientError
+from pg8000.native import Connection, InterfaceError, identifier
+
+logger = logging.getLogger(__name__)
+
+logging.basicConfig(
+ format="{asctime} - {levelname} - {message}",
+ style="{",
+ datefmt="%Y-%m-%d %H:%M",
+ level=logging.DEBUG,
+)
+
+logging.getLogger("botocore").setLevel(logging.WARNING)
+
+
+class DBConnectionException(Exception):
+ """Wraps pg8000.native Error or DatabaseError."""
+
+ def __init__(self, e):
+ """Initialise with provided error message."""
+ self.message = str(e)
+ super().__init__(self.message)
+
+
+def lambda_handler(event, context):
+ """This lambda function connects to the Totesys database, lists the contents of the ingestion bucket,
+ and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them
+ it uses 3 helper functions to achieve these 3 functionalities
+ """
+ db = None
+ try:
+ db = connect_to_database()
+ existing_files = list_existing_s3_files()
+ any_changes = process_and_upload_tables(db, existing_files)
+
+ if not any_changes["updated"]:
+ logger.info("No changes detected in the database.")
+ return {
+ "statusCode": 200,
+ "body": json.dumps("No changes detected, no CSV files were uploaded."),
+ }
+ return {
+ "statusCode": 200,
+ "body": json.dumps(
+ f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{
+ 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}"""
+ ),
+ }
+ except Exception as e:
+ logger.error(f"Error: {e}", exc_info=True)
+ return {"statusCode": 500, "body": json.dumps("Internal server error.")}
+ finally:
+ if db:
+ db.close()
+
+
+def retrieve_secrets():
+ secret_name = "bentley-secrets"
+ region_name = "eu-west-2"
+
+ # Create a Secrets Manager client
+ session = boto3.session.Session()
+ client = session.client(service_name="secretsmanager", region_name=region_name)
+
+ try:
+ get_secret_value_response = client.get_secret_value(SecretId=secret_name)
+ except ClientError as e:
+ logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}")
+ raise e
+ except KeyError:
+ logger.error(f"Secret {secret_name} does not contain a SecretString")
+ raise ValueError(f"Secret {secret_name} does not contain a SecretString")
+
+ return get_secret_value_response["SecretString"]
+
+
+def connect_to_database() -> Connection:
+ try:
+ secrets = json.loads(retrieve_secrets())
+ host = secrets["host"]
+ port = secrets["port"]
+ user = secrets["user"]
+ password = secrets["password"]
+ database = secrets["database"]
+
+ return Connection(
+ database=database, user=user, password=password, host=host, port=port
+ )
+ except InterfaceError as i:
+ logger.error(f"Interface error: {i}")
+ raise DBConnectionException("Failed to connect to database")
+
+
+def extract_bucket(client=boto3.client("s3")):
+ response = client.list_buckets()
+ extract_bucket_filter = [
+ bucket["Name"] for bucket in response["Buckets"] if "extract" in bucket["Name"]
+ ]
+
+ return extract_bucket_filter[0]
+
+
+def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3")):
+ """Creates a dictionary and populates it with the
+ results of listing the contents of the s3 bucket, then
+ returns the populated dictionary
+ """
+ logging.info("Listing existing S3 files")
+ existing_files = {}
+
+ try:
+ response = client.list_objects_v2(Bucket=bucket_name)
+
+ if "Contents" in response:
+ for obj in response["Contents"]:
+ s3_key = obj["Key"]
+ try:
+ file_obj = client.get_object(Bucket=bucket_name, Key=s3_key)
+ file_content = file_obj["Body"].read().decode("utf-8")
+ existing_files[s3_key] = file_content
+ except ClientError as e:
+ logger.error(f"Error retrieving S3 object {s3_key}: {e}")
+ else:
+ logger.error("The bucket is empty")
+ return None
+
+ except ClientError as e:
+ logger.error(f"Error listing S3 objects: {e}")
+
+ return existing_files
+
+
+def get_latest_timestamp(existing_files):
+ if existing_files:
+ all_datetimes = []
+ for file_name in existing_files.keys():
+ match = re.search(r"\/(.+/).+_(.+)\.csv", file_name)
+ if match:
+ datetime_str = "".join(match.group(1, 2))
+ all_datetimes.append(
+ datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")
+ )
+ return max(all_datetimes) if all_datetimes else datetime.min
+
+ return existing_files
+
+
+def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
+ """Creates a list of the tables from a database query and
+ then selects everything from each table in individual queries
+ it then writes each table to CSV files and compares with the item
+ in the existing_files dictionary with the same name. If it finds any changes
+ to files, or new tables/files it uploads them to the s3 bucket
+ """
+ load_status = {"updated": [], "no change": []}
+ latest_timestamp = get_latest_timestamp(existing_files)
+
+ tables = db.run(
+ """
+ SELECT table_name
+ FROM information_schema.tables
+ WHERE table_schema='public' AND table_name != '_prisma_migrations'
+ AND table_type='BASE TABLE';
+ """
+ )
+
+ for table in tables:
+ table_name = table[0]
+ base_query = f"""
+ SELECT * FROM {identifier(table_name)}
+ WHERE last_updated >= :latest;
+ """
+ latest = (
+ {
+ datetime.strftime(
+ latest_timestamp if latest_timestamp else datetime(1990, 1, 1),
+ "%Y-%m-%d %H:%M:%S",
+ )
+ },
+ )
+ logger.info(f"Processing table: {table_name}")
+ logger.info(f"Latest timestamp: {latest[0]}")
+ rows = db.run(base_query, latest=latest)
+ logger.debug(f"Rows: {rows}")
+ # Creating a temporary file path and writing the column name to it followed by each row of data
+ if rows:
+ csv_file_path = f"/tmp/{table_name}.csv"
+ with open(csv_file_path, "w", newline="") as file:
+ writer = csv.writer(file)
+ # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
+ column_names = [
+ col_name[0]
+ for col_name in db.run(
+ """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS
+ WHERE table_name = :table ;""",
+ table=table_name,
+ )
+ ]
+ writer.writerow(column_names)
+ writer.writerows(rows)
+ s3_key = datetime.strftime(
+ datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv"
+ )
+
+ # Writing the new file to S3 extract bucket:
+ try:
+ client.upload_file(csv_file_path, extract_bucket(), s3_key)
+ load_status["updated"].append(table_name)
+ logger.info(f"Uploaded {s3_key} to S3.")
+ except ClientError as e:
+ logger.error(f"Error uploading to S3: {e}")
+ else:
+ load_status["no change"].append(table_name)
+ logger.info(f"No new data")
+ return load_status
+
+
+if __name__ == "__main__":
+ lambda_handler(None, None)
diff --git a/src/load_lambda.py b/src/load_lambda.py
new file mode 100644
index 0000000..c6a8e60
--- /dev/null
+++ b/src/load_lambda.py
@@ -0,0 +1,2 @@
+def lambda_handler():
+ pass
diff --git a/src/transform_lambda.py b/src/transform_lambda.py
new file mode 100644
index 0000000..c6a8e60
--- /dev/null
+++ b/src/transform_lambda.py
@@ -0,0 +1,2 @@
+def lambda_handler():
+ pass
diff --git a/terraform/events.tf b/terraform/events.tf
new file mode 100644
index 0000000..53ae10a
--- /dev/null
+++ b/terraform/events.tf
@@ -0,0 +1,109 @@
+#################
+# Random String #
+#################
+
+resource "random_string" "eventbridge_suffix" {
+ length = 8
+ special = false
+ upper = false
+}
+
+resource "random_string" "s3_ingestion_suffix" {
+ length = 8
+ special = false
+ upper = false
+}
+
+resource "random_string" "s3_transform_suffix" {
+ length = 8
+ special = false
+ upper = false
+}
+
+#############################
+# EventBridge Configuration #
+#############################
+
+resource "aws_cloudwatch_event_rule" "lambda_trigger" {
+ name = "lambda-scheduled-trigger"
+ description = "Schedule to trigger the Lambda function"
+ schedule_expression = "rate(30 minutes)"
+}
+
+resource "aws_cloudwatch_event_target" "extract_lambda_cw_event" {
+ rule = aws_cloudwatch_event_rule.lambda_trigger.name
+ target_id = "TargetFunctionV1"
+ arn = aws_lambda_function.extract_lambda.arn
+ depends_on = [aws_lambda_permission.allow_eventbridge]
+}
+
+resource "aws_lambda_permission" "allow_eventbridge" {
+ statement_id = "AllowExecutionFromEventBridge${random_string.eventbridge_suffix.result}"
+ action = "lambda:InvokeFunction"
+ function_name = aws_lambda_function.extract_lambda.function_name
+ principal = "events.amazonaws.com"
+ source_arn = aws_cloudwatch_event_rule.lambda_trigger.arn
+
+ lifecycle {
+ create_before_destroy = true
+ replace_triggered_by = [random_string.eventbridge_suffix]
+ }
+}
+
+########################################
+# S3 Extract Bucket Notification Setup #
+########################################
+
+resource "aws_lambda_permission" "allow_s3_ingestion" {
+ statement_id = "AllowS3InvokeLambdaTransform${random_string.s3_ingestion_suffix.result}"
+ action = "lambda:InvokeFunction"
+ function_name = aws_lambda_function.transform_lambda.function_name
+ principal = "s3.amazonaws.com"
+ source_arn = aws_s3_bucket.extract_bucket.arn
+
+ lifecycle {
+ create_before_destroy = true
+ replace_triggered_by = [random_string.s3_ingestion_suffix]
+ }
+}
+
+
+resource "aws_s3_bucket_notification" "extract_bucket_notification" {
+ bucket = aws_s3_bucket.extract_bucket.id
+
+ lambda_function {
+ events = ["s3:ObjectCreated:*"]
+ lambda_function_arn = aws_lambda_function.transform_lambda.arn
+ }
+
+ depends_on = [aws_lambda_permission.allow_s3_ingestion]
+}
+
+##########################################
+# S3 Transform Bucket Notification Setup #
+##########################################
+
+resource "aws_lambda_permission" "allow_s3_transform_bucket" {
+ statement_id = "AllowS3InvokeLambdaTransform${random_string.s3_transform_suffix.result}"
+ action = "lambda:InvokeFunction"
+ function_name = aws_lambda_function.transform_lambda.function_name
+ principal = "s3.amazonaws.com"
+ source_arn = aws_s3_bucket.transform_bucket.arn
+
+ lifecycle {
+ create_before_destroy = true
+ replace_triggered_by = [random_string.s3_transform_suffix]
+ }
+}
+
+
+resource "aws_s3_bucket_notification" "transform_bucket_notification" {
+ bucket = aws_s3_bucket.transform_bucket.id
+
+ lambda_function {
+ events = ["s3:ObjectCreated:*"]
+ lambda_function_arn = aws_lambda_function.transform_lambda.arn
+ }
+
+ depends_on = [aws_lambda_permission.allow_s3_transform_bucket]
+}
diff --git a/terraform/iam.tf b/terraform/iam.tf
new file mode 100644
index 0000000..3d62b69
--- /dev/null
+++ b/terraform/iam.tf
@@ -0,0 +1,202 @@
+# Description: This file contains the IAM roles and policies for the lambda functions
+########################################################################
+# IAM MULTI-ROLE SETUP
+########################################################################
+
+# DEFINE MULTI-SERVICE ROLE (lambda, s3, cloudwatch, events)
+resource "aws_iam_role" "multi_service_role" {
+ name = "multi_service_role"
+
+ assume_role_policy = jsonencode({
+ Version = "2012-10-17"
+ Statement = [
+ {
+ Action = "sts:AssumeRole"
+ Effect = "Allow"
+ Principal = {
+ Service = [
+ "lambda.amazonaws.com",
+ "scheduler.amazonaws.com"
+ ]
+ }
+ }
+ ]
+ })
+}
+
+
+########################################################################
+# S3 SETUP
+# Description: allows allows retention/tagging/access control settings
+# Lambda IAM Policy for S3
+########################################################################
+
+# S3 DEFINE POLICY
+data "aws_iam_policy_document" "s3_data_policy_doc" {
+ statement {
+ effect = "Allow"
+ actions = [
+ "s3:PutObject",
+ "s3:PutObjectRetention",
+ "s3:PutObjectTagging",
+ "s3:PutObjectAcl",
+ "s3:ListObjects",
+ "s3:ListObjectsV2",
+ "s3:GetObject"
+ ]
+ resources = [
+ "${aws_s3_bucket.extract_bucket.arn}/*",
+ "${aws_s3_bucket.transform_bucket.arn}/*",
+ "${aws_s3_bucket.lambda_code_bucket.arn}/*",
+ ]
+ }
+
+ statement {
+ effect = "Allow"
+ actions = [
+ "s3:ListBucket",
+ "s3:ListAllMyBuckets",
+ "s3:ListObjectsV2",
+ "s3:ListObjects"
+ ]
+ resources = [
+ "arn:aws:s3:::*",
+ ]
+ }
+}
+
+
+########################################################################
+# LAMBDA SETUP
+# Description: Allows Lambda permission to write to Cloudwatch logs
+########################################################################
+
+resource "aws_iam_policy" "lambda_execution_policy" {
+ name = "lambda_execution_policy"
+ path = "/"
+ description = "IAM policy for Lambda execution"
+
+ policy = jsonencode({
+ Version = "2012-10-17"
+ Statement = [
+ {
+ Effect = "Allow"
+ Action = [
+ "lambda:InvokeFunction",
+ "lambda:GetFunction"
+ ]
+ Resource = "*"
+ }
+ ]
+ }
+ )
+}
+
+########################################################################
+# CLOUDWATCH SETUP
+# Description: Give permission for Lambda to write to CloudWatch logs
+########################################################################
+
+data "aws_iam_policy_document" "cw_document" {
+ statement {
+ actions = ["logs:CreateLogGroup"]
+ resources = [
+ "arn:aws:logs:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:*"
+ ]
+ }
+
+ statement {
+ actions = [
+ "logs:CreateLogStream",
+ "logs:CreateLogGroup",
+ "logs:PutLogEvents"
+ ]
+ resources = [
+ "arn:aws:logs:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:log-group:/aws/lambda/*"
+ ]
+ }
+}
+
+resource "aws_iam_policy" "cw_policy" {
+ name = "cw_policy"
+ policy = data.aws_iam_policy_document.cw_document.json
+}
+
+########################################################################
+# POLICY WRITE & ATTACH
+########################################################################
+
+# S3 WRITE POLICY
+resource "aws_iam_policy" "s3_write_policy" {
+ policy = data.aws_iam_policy_document.s3_data_policy_doc.json
+}
+
+resource "aws_iam_role_policy_attachment" "s3_attachment" {
+ role = aws_iam_role.multi_service_role.name
+ policy_arn = aws_iam_policy.s3_write_policy.arn
+}
+
+resource "aws_iam_role_policy_attachment" "lambda_attachment" {
+ role = aws_iam_role.multi_service_role.name
+ policy_arn = aws_iam_policy.lambda_execution_policy.arn
+}
+
+resource "aws_iam_role_policy_attachment" "cw_attachment" {
+ role = aws_iam_role.multi_service_role.name
+ policy_arn = aws_iam_policy.cw_policy.arn
+}
+
+###################
+# EVENTS POLICIES #
+###################
+
+data "aws_iam_policy_document" "cloudwatch_events_policy" {
+ statement {
+ actions = [
+ "events:PutRule",
+ "events:PutTargets",
+ "events:RemoveTargets",
+ "events:DeleteRule",
+ "events:PutEvents"
+ ]
+ resources = ["*"]
+ effect = "Allow"
+ }
+}
+
+resource "aws_iam_policy" "cloudwatch_events_policy" {
+ name = "cloudwatch_events_policy"
+ policy = data.aws_iam_policy_document.cloudwatch_events_policy.json
+}
+
+resource "aws_iam_role_policy_attachment" "cloudwatch_events_attachment" {
+ role = aws_iam_role.multi_service_role.name
+ policy_arn = aws_iam_policy.cloudwatch_events_policy.arn
+}
+
+#########################
+# SECRETS MANAGER SETUP #
+#########################
+
+# Policy Doc
+data "aws_iam_policy_document" "secrets_manager_policy_doc" {
+ statement {
+ effect = "Allow"
+ actions = [
+ "secretsmanager:GetSecretValue"
+ ]
+ resources = ["arn:aws:secretsmanager:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:secret:bentley-secrets-Na0yc8"]
+ }
+}
+
+# SM Policy Resource
+resource "aws_iam_policy" "secrets_manager_policy" {
+ name = "secrets_manager_policy"
+ policy = data.aws_iam_policy_document.secrets_manager_policy_doc.json
+}
+
+# Attach SM Policy to Role
+resource "aws_iam_role_policy_attachment" "secrets_manager_attachment" {
+ role = aws_iam_role.multi_service_role.name
+ policy_arn = aws_iam_policy.secrets_manager_policy.arn
+}
diff --git a/terraform/lambda.tf b/terraform/lambda.tf
new file mode 100644
index 0000000..f8e7515
--- /dev/null
+++ b/terraform/lambda.tf
@@ -0,0 +1,146 @@
+####################
+# Common Variables #
+####################
+
+locals {
+ layer_dir = "../"
+ layer_zip = "layer.zip"
+ layer_name = "lambda_layer"
+ script_dir = "../scripts"
+ layer_zip_path = "${local.layer_dir}/${local.layer_zip}"
+}
+
+######################
+# Lambda Layer Setup #
+######################
+
+resource "null_resource" "prepare_layer" {
+
+ # New change: only run the script if the layer zip does not exist
+
+ triggers = {
+ layer_zip_exists = fileexists(local.layer_zip_path) ? "exists" : "not_exists"
+ }
+
+ provisioner "local-exec" {
+ command = "if [ ! -f ${local.layer_zip_path} ]; then bash ${local.script_dir}/make_layer_zip.sh; fi"
+ }
+}
+
+resource "aws_s3_object" "lambda_layer_zip" {
+ bucket = aws_s3_bucket.lambda_code_bucket.id #bucket instead of id
+ key = "${local.layer_name}/${local.layer_zip}"
+ source = "${local.layer_dir}/${local.layer_zip}"
+ depends_on = [null_resource.prepare_layer]
+ etag = fileexists(local.layer_zip_path) ? filemd5(local.layer_zip_path) : null
+}
+
+resource "aws_lambda_layer_version" "lambda_layer" {
+ layer_name = local.layer_name
+ compatible_runtimes = ["python3.11"]
+ s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ s3_key = aws_s3_object.lambda_layer_zip.key
+ source_code_hash = fileexists(local.layer_zip_path) ? filebase64sha256(local.layer_zip_path) : null
+ skip_destroy = true
+ depends_on = [aws_s3_object.lambda_layer_zip]
+}
+
+###########################
+# Extract Lambda Function #
+###########################
+
+data "archive_file" "extract_lambda_zip" {
+ type = "zip"
+ source_file = "${path.module}/../src/extract_lambda.py"
+ output_path = "${path.module}/../extract_function.zip"
+}
+resource "aws_s3_object" "extract_lambda_code" {
+ bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ key = "${var.extract_lambda_name}/extract_function.zip"
+ source = data.archive_file.extract_lambda_zip.output_path
+ etag = filemd5(data.archive_file.extract_lambda_zip.output_path)
+}
+
+resource "aws_lambda_function" "extract_lambda" {
+ function_name = var.extract_lambda_name
+ s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ s3_key = aws_s3_object.extract_lambda_code.key
+ layers = [aws_lambda_layer_version.lambda_layer.arn]
+ role = aws_iam_role.multi_service_role.arn
+ handler = "extract_lambda.lambda_handler"
+ runtime = "python3.11"
+ source_code_hash = data.archive_file.extract_lambda_zip.output_base64sha256
+
+ lifecycle {
+ create_before_destroy = true
+ }
+
+ depends_on = [aws_s3_object.extract_lambda_code]
+}
+
+#############################
+# Transform Lambda Function #
+#############################
+
+data "archive_file" "transform_lambda_zip" {
+ type = "zip"
+ source_file = "${path.module}/../src/transform_lambda.py"
+ output_path = "${path.module}/../transform_function.zip"
+}
+resource "aws_s3_object" "transform_lambda_code" {
+ bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ key = "${var.transform_lambda_name}/transform_function.zip"
+ source = data.archive_file.transform_lambda_zip.output_path
+ etag = filemd5(data.archive_file.transform_lambda_zip.output_path)
+}
+
+resource "aws_lambda_function" "transform_lambda" {
+ function_name = var.transform_lambda_name
+ s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ s3_key = aws_s3_object.transform_lambda_code.key
+ layers = [aws_lambda_layer_version.lambda_layer.arn]
+ role = aws_iam_role.multi_service_role.arn
+ handler = "transform_lambda.lambda_handler"
+ runtime = "python3.11"
+ source_code_hash = data.archive_file.transform_lambda_zip.output_base64sha256
+
+ lifecycle {
+ create_before_destroy = true
+ }
+
+ depends_on = [aws_s3_object.transform_lambda_code]
+}
+
+########################
+# Load Lambda Function #
+########################
+
+data "archive_file" "load_lambda_zip" {
+ type = "zip"
+ source_file = "${path.module}/../src/load_lambda.py"
+ output_path = "${path.module}/../load_function.zip"
+}
+resource "aws_s3_object" "load_lambda_code" {
+ bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ key = "${var.load_lambda_name}/load_function.zip"
+ source = data.archive_file.load_lambda_zip.output_path
+ etag = filemd5(data.archive_file.load_lambda_zip.output_path)
+}
+
+resource "aws_lambda_function" "load_lambda" {
+ function_name = var.load_lambda_name
+ s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ s3_key = aws_s3_object.load_lambda_code.key
+ layers = [aws_lambda_layer_version.lambda_layer.arn]
+ role = aws_iam_role.multi_service_role.arn
+ handler = "load_lambda.lambda_handler"
+ runtime = "python3.11"
+ source_code_hash = data.archive_file.load_lambda_zip.output_base64sha256
+
+ lifecycle {
+ create_before_destroy = true
+ }
+
+ depends_on = [aws_s3_object.load_lambda_code]
+}
+
diff --git a/terraform/main.tf b/terraform/main.tf
new file mode 100644
index 0000000..6577b70
--- /dev/null
+++ b/terraform/main.tf
@@ -0,0 +1,40 @@
+terraform {
+ required_version = ">= 1.8.0"
+ required_providers {
+ aws = {
+ source = "hashicorp/aws"
+ version = "~>5.0"
+ }
+ null = {
+ source = "hashicorp/null"
+ version = "~>3.2.2"
+ }
+ archive = {
+ source = "hashicorp/archive"
+ version = "~>2.5.0"
+ }
+ random = {
+ source = "hashicorp/random"
+ version = "~>3.6.2"
+ }
+ }
+ backend "s3" {
+ bucket = "bentley-project-secrets"
+ key = "bentley-project/terraform.tfstate"
+ region = "eu-west-2"
+ encrypt = true
+ }
+}
+
+provider "aws" {
+ region = "eu-west-2"
+ default_tags {
+ tags = {
+ ProjectName = var.project_name
+ Environment = var.environment
+ ManagedBy = "Terraform"
+ GitHubRepo = var.github_repo
+ Team = var.team_name
+ }
+ }
+}
diff --git a/terraform/s3.tf b/terraform/s3.tf
new file mode 100644
index 0000000..14e8835
--- /dev/null
+++ b/terraform/s3.tf
@@ -0,0 +1,57 @@
+########################
+# EXTRACT BUCKET SETUP #
+########################
+
+resource "aws_s3_bucket" "extract_bucket" {
+ bucket_prefix = "${var.s3_extract_bucket_name}-"
+ force_destroy = true
+ tags = {
+ Name = "Ingestion Bucket"
+ }
+}
+
+resource "aws_s3_bucket_versioning" "extract_bucket_versioning" {
+ bucket = aws_s3_bucket.extract_bucket.id
+ versioning_configuration {
+ status = "Enabled"
+ }
+}
+
+##########################
+# TRANSFORM BUCKET SETUP #
+##########################
+
+resource "aws_s3_bucket" "transform_bucket" {
+ bucket_prefix = "${var.s3_transform_bucket_name}-"
+ force_destroy = true
+ tags = {
+ Name = "Transform Bucket"
+ }
+}
+
+
+resource "aws_s3_bucket_versioning" "transform_bucket_versioning" {
+ bucket = aws_s3_bucket.transform_bucket.id
+ versioning_configuration {
+ status = "Enabled"
+ }
+}
+
+#######################
+# LAMBDA BUCKET SETUP #
+#######################
+
+resource "aws_s3_bucket" "lambda_code_bucket" {
+ bucket_prefix = "${var.s3_code_bucket_name}-"
+ force_destroy = true
+ tags = {
+ Name = "Lambda Bucket"
+ }
+}
+
+resource "aws_s3_bucket_versioning" "lambda_bucket_versioning" {
+ bucket = aws_s3_bucket.lambda_code_bucket.id
+ versioning_configuration {
+ status = "Enabled"
+ }
+}
diff --git a/terraform/vars.tf b/terraform/vars.tf
new file mode 100644
index 0000000..b3e3e47
--- /dev/null
+++ b/terraform/vars.tf
@@ -0,0 +1,53 @@
+variable "s3_extract_bucket_name" {
+ type = string
+ default = "extract-bucket"
+}
+
+variable "s3_transform_bucket_name" {
+ type = string
+ default = "transform-bucket"
+}
+
+variable "s3_code_bucket_name" {
+ type = string
+ default = "lambda-bucket"
+}
+
+variable "extract_lambda_name" {
+ type = string
+ default = "extract-lambda"
+}
+
+variable "transform_lambda_name" {
+ type = string
+ default = "transform-lambda"
+}
+
+variable "load_lambda_name" {
+ type = string
+ default = "load-lambda"
+}
+
+variable "project_name" {
+ type = string
+ default = "tt"
+}
+
+variable "environment" {
+ type = string
+ default = "dev"
+}
+
+variable "github_repo" {
+ type = string
+ default = "de-project-bentley"
+}
+
+variable "team_name" {
+ type = string
+ default = "Team-Bentley"
+}
+
+data "aws_caller_identity" "current" {}
+
+data "aws_region" "current" {}
diff --git a/test.py b/test.py
deleted file mode 100644
index e69de29..0000000
--- a/test.py
+++ /dev/null
diff --git a/tests/dummy.txt b/tests/dummy.txt
new file mode 100644
index 0000000..af27ff4
--- /dev/null
+++ b/tests/dummy.txt
@@ -0,0 +1 @@
+This is a test file. \ No newline at end of file
diff --git a/tests/dummy_identical.csv b/tests/dummy_identical.csv
new file mode 100644
index 0000000..e44e9fc
--- /dev/null
+++ b/tests/dummy_identical.csv
@@ -0,0 +1,4 @@
+Food_type,Flavour,Colour,last_updated
+Vegetable,Sour,Green,2022-11-03 14:20:49.962
+Berry,Sweet,Red,2022-11-03 14:20:49.962
+
diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py
new file mode 100644
index 0000000..548ce67
--- /dev/null
+++ b/tests/test_extract_lambda.py
@@ -0,0 +1,247 @@
+import boto3.exceptions
+import botocore.exceptions
+import pytest
+import boto3
+from moto import mock_aws
+from unittest.mock import patch, MagicMock
+from unittest import TestCase
+import os
+import logging
+import json
+from src.extract_lambda import (
+ list_existing_s3_files,
+ connect_to_database,
+ DBConnectionException,
+ lambda_handler,
+ process_and_upload_tables,
+ retrieve_secrets,
+ extract_bucket,
+)
+
+
+@pytest.fixture(scope="class")
+def mock_config():
+ env_vars = {
+ "host": "abc",
+ "port": "5432",
+ "user": "def",
+ "password": "password",
+ "database": "db",
+ }
+ with patch(
+ "src.extract_lambda.retrieve_secrets", return_value=env_vars
+ ) as mock_config:
+ yield mock_config
+
+
+@pytest.fixture(scope="class")
+def aws_credentials():
+ os.environ["AWS_ACCESS_KEY_ID"] = "testing"
+ os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+ os.environ["AWS_SECURITY_TOKEN"] = "testing"
+ os.environ["AWS_SESSION_TOKEN"] = "testing"
+ os.environ["AWS_DEFAULT_REGION"] = "eu-west-2"
+
+
+@pytest.fixture(scope="class")
+def s3_client(aws_credentials):
+ with mock_aws():
+ yield boto3.client("s3")
+
+
+@pytest.fixture(scope="class")
+def s3_mock_bucket(s3_client):
+ bucket = s3_client.create_bucket(
+ Bucket="extract_bucket",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
+ return bucket
+
+
+class TestLambdaHandler:
+ def test_files_processed_and_uploaded_successfully(self, mocker):
+ mock_db = MagicMock()
+ mock_db.run.side_effect = [
+ [["Fruits"]],
+ [["Vegetable", "Sour", "Green"], ["Berry", "Sweet", "Red"]],
+ [["Food_type"], ["Flavour"], ["Colour"]],
+ ]
+ mock_db.columns.return_value = [
+ {"name": "Food_type"},
+ {"name": "Flavour"},
+ {"name": "Colour"},
+ ]
+ with patch("src.extract_lambda.connect_to_database", return_value=mock_db):
+ mock_process_and_upload_tables = mocker.patch(
+ "src.extract_lambda.process_and_upload_tables",
+ return_value={
+ "updated": ["Fruits"],
+ "no change": ["Vegetable", "Berry"],
+ },
+ )
+ mock_list_existing_s3_files = mocker.patch(
+ "src.extract_lambda.list_existing_s3_files", return_value={}
+ )
+ event = {}
+ context = {}
+ response = lambda_handler(event, context)
+ assert response["statusCode"] == 200
+ assert json.loads(response["body"]) == (
+ "CSV files processed for Fruits and uploaded successfully."
+ "The following tables were not updated: Vegetable, Berry"
+ )
+ mock_list_existing_s3_files.assert_called_once()
+ mock_process_and_upload_tables.assert_called_once_with(mock_db, {})
+ mock_db.close.assert_called_once()
+
+ def test_no_changes_detected_no_files_uploaded(self, mocker):
+ mock_db = MagicMock()
+ mock_db.run.side_effect = [
+ [["Fruits"]],
+ [["Vegetable", "Sour", "Green"], ["Berry", "Sweet", "Red"]],
+ [["Food_type"], ["Flavour"], ["Colour"]],
+ ]
+ mock_db.columns.return_value = [
+ {"name": "Food_type"},
+ {"name": "Flavour"},
+ {"name": "Colour"},
+ ]
+
+ with patch("src.extract_lambda.connect_to_database", return_value=mock_db):
+ mock_process_and_upload_tables = mocker.patch(
+ "src.extract_lambda.process_and_upload_tables",
+ return_value={"updated": [], "no change": ["Fruits"]},
+ )
+ mock_list_existing_s3_files = mocker.patch(
+ "src.extract_lambda.list_existing_s3_files", return_value={}
+ )
+ event = {}
+ context = {}
+ response = lambda_handler(event, context)
+ assert response["statusCode"] == 200
+ assert (
+ json.loads(response["body"])
+ == "No changes detected, no CSV files were uploaded."
+ )
+ mock_list_existing_s3_files.assert_called_once()
+ mock_process_and_upload_tables.assert_called_once_with(mock_db, {})
+ mock_db.close.assert_called_once()
+
+ def test_exception_error(self, mocker):
+ with patch(
+ "src.extract_lambda.connect_to_database",
+ side_effect=Exception("Database connection error"),
+ ):
+ mock_process_and_upload_tables = mocker.patch(
+ "src.extract_lambda.process_and_upload_tables"
+ )
+ mock_list_existing_s3_files = mocker.patch(
+ "src.extract_lambda.list_existing_s3_files"
+ )
+ event = {}
+ context = {}
+ response = lambda_handler(event, context)
+ assert response["statusCode"] == 500
+ assert json.loads(response["body"]) == "Internal server error."
+ mock_list_existing_s3_files.assert_not_called()
+ mock_process_and_upload_tables.assert_not_called()
+
+
+class TestExtractBucket:
+ def test_extract_bucket_returns_bucket_name(self, s3_client, s3_mock_bucket):
+ result = extract_bucket(s3_client)
+ assert result == "extract_bucket"
+
+ def test_bucket_returns_first_bucket(self, s3_client):
+ bucket1 = s3_client.create_bucket(
+ Bucket="bucket1",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
+ result = extract_bucket(s3_client)
+ assert result == "extract_bucket"
+
+ def test_returns_index_error_if_no_buckets(self, s3_client):
+ s3_client.delete_bucket(Bucket="extract_bucket")
+ s3_client.delete_bucket(Bucket="bucket1")
+
+ with pytest.raises(IndexError, match="list index out of range"):
+ extract_bucket(s3_client)
+
+
+class TestListExistingS3Files:
+ def test_error_if_no_bucket(self, s3_client, caplog):
+ logger = logging.getLogger()
+ logger.info("Testing now.")
+ caplog.set_level(logging.ERROR)
+ list_existing_s3_files(client=s3_client)
+ assert "Error listing S3 objects" in caplog.text
+
+ def test_error_if_bucket_is_empty(self, s3_client, caplog, s3_mock_bucket):
+ list_existing_s3_files("extract_bucket", client=s3_client)
+ assert "The bucket is empty" in caplog.text
+
+ def test_retrieves_file_content(self, s3_client, caplog, s3_mock_bucket):
+ s3_client.upload_file("tests/dummy.txt", "extract_bucket", "dummy.txt")
+ result = list_existing_s3_files("extract_bucket", client=s3_client)
+ assert list(result.values()) == ["This is a test file."]
+
+
+class TestConnectToDatabase:
+ def test_connect_to_database(mock_conn, mock_config):
+ with patch("src.extract_lambda.Connection", autospec=True) as mock_conn:
+ connect_to_database()
+ mock_conn.assert_called_with(
+ host="abc", user="def", port="5432", password="password", database="db"
+ )
+
+ def test_database_error(self, mock_config): # had mock_config in param
+ with pytest.raises(DBConnectionException):
+ connect_to_database()
+
+ def test_logs_interface_error(self, caplog):
+ logger = logging.getLogger()
+ logger.info("Testing now.")
+ caplog.set_level(logging.ERROR)
+ with pytest.raises(DBConnectionException):
+ connect_to_database()
+ assert "Interface error" in caplog.text
+
+
+class TestProcessAndUploadTables:
+ def test_error_process_and_upload_tables(self, mock_conn, s3_client, caplog):
+ caplog.set_level(logging.INFO)
+
+ # Mock return values for database queries
+ queries = [
+ "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';",
+ "SELECT * FROM Fruits WHERE last_updated > :latest;",
+ "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = 'Fruits';",
+ ]
+ return_values = [
+ [["Fruits"]],
+ [], # No new rows with a more recent last_updated timestamp
+ [["Food_type"], ["Flavour"], ["Colour"], ["last_updated"]],
+ ]
+ vals = dict(zip(queries, return_values))
+
+ # Patch the database connection and set return values for queries
+ with patch("src.extract_lambda.Connection") as mock_db:
+ mock_db().run.side_effect = return_values
+ s3_key = "Fruits/2024/08/15/Fruits_16:46:30.csv"
+ existing_files = {
+ s3_key: "Food_type,Flavour,Colour,last_updated\nVegetable,Sour,Green,2022-11-03 14:20:49.962\nBerry,Sweet,Red,2022-11-03 14:20:49.962"
+ }
+
+ # Simulate S3 bucket and file setup
+ s3_client.create_bucket(
+ Bucket="test_extract_bucket",
+ CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
+ )
+ s3_client.upload_file(
+ "tests/dummy_identical.csv", "test_extract_bucket", s3_key
+ )
+
+ # Run the process_and_upload_tables function
+ process_and_upload_tables(mock_db(), existing_files, client=s3_client)
+ # Assert that the log contains "No new data"
+ assert "No new data" in caplog.text
diff --git a/tests/test_secrets_manager.py b/tests/test_secrets_manager.py
new file mode 100644
index 0000000..609c572
--- /dev/null
+++ b/tests/test_secrets_manager.py
@@ -0,0 +1,84 @@
+from src.secrets_manager import sm_client, retrieve_secrets
+import boto3
+import botocore.exceptions
+from moto import mock_aws
+import json
+import pytest
+import os
+
+
+@pytest.fixture(scope="function")
+def aws_credentials():
+ """Mocked AWS Credentials for moto."""
+ os.environ["AWS_ACCESS_KEY_ID"] = "testing"
+ os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+ os.environ["AWS_SECURITY_TOKEN"] = "testing"
+ os.environ["AWS_SESSION_TOKEN"] = "testing"
+ os.environ["AWS_DEFAULT_REGION"] = "eu-west-2"
+
+
+@pytest.fixture(scope="function")
+def mock_sm_client(aws_credentials):
+ with mock_aws():
+ yield boto3.client("secretsmanager")
+
+
+@pytest.fixture(scope="function")
+def mock_store_secret(mock_sm_client):
+ secret = {
+ "cohort_id": "test_cohort_id",
+ "user": "test_user_id",
+ "password": "test_password",
+ "host": "test_host",
+ "database": "test_database",
+ "port": "test_port",
+ }
+
+ secret_name = "test_secret"
+
+ response = mock_sm_client.create_secret(
+ Name=secret_name, SecretString=json.dumps(secret)
+ )
+
+ return response
+
+
+def test_retrieves_secrets_returns_dictionary(mock_sm_client, mock_store_secret):
+ secret_name = "test_secret"
+
+ result = retrieve_secrets(mock_sm_client, secret_name)
+
+ assert isinstance(result, dict)
+
+
+def test_retrieves_secrets_returns_correct_keys_and_values(
+ mock_sm_client, mock_store_secret
+):
+ secret_name = "test_secret"
+
+ result = retrieve_secrets(mock_sm_client, secret_name)
+
+ assert result["cohort_id"] == "test_cohort_id"
+ assert result["user"] == "test_user_id"
+ assert result["password"] == "test_password"
+ assert result["host"] == "test_host"
+ assert result["database"] == "test_database"
+ assert result["port"] == "test_port"
+
+
+def test_retrieves_secrets_raises_error_if_secret_name_incorrect_data_type(
+ mock_sm_client,
+):
+ secret_name = [1, 2, 3]
+
+ with pytest.raises(botocore.exceptions.ParamValidationError) as error:
+ retrieve_secrets(mock_sm_client, secret_name)
+
+
+def test_retrieves_secrets_raises_error_if_secret_name_does_not_exist(
+ mock_sm_client, mock_store_secret
+):
+ secret_name = "test_secret_2"
+
+ with pytest.raises(botocore.exceptions.ClientError) as error:
+ retrieve_secrets(mock_sm_client, secret_name)
git.ajschof.me — hosted by ajschofield — powered by cgit