aboutsummaryrefslogtreecommitdiffstats
path: root/obfuscator
diff options
context:
space:
mode:
authorAlex <git@ajschof.me>2025-02-19 15:58:28 +0000
committerGitHub <noreply@github.com>2025-02-19 15:58:28 +0000
commit4066bf747e1e4c938526957c119f3f1485ee251e (patch)
tree9a1e95f4ccbdd04e19d67a6c13641a19c4d4f3e0 /obfuscator
parentf24955044c4c05e37aba4efb505ec63b44113912 (diff)
parent5402af2c7198a685a57a05e29a869e1e72a6b877 (diff)
downloadgdpr-obfuscator-4066bf747e1e4c938526957c119f3f1485ee251e.tar.gz
gdpr-obfuscator-4066bf747e1e4c938526957c119f3f1485ee251e.zip
Merge pull request #8 from ajschofield/refining-phase
mostly minor changes (fixing things up)
Diffstat (limited to 'obfuscator')
-rw-r--r--obfuscator/csv_reader.py97
-rw-r--r--obfuscator/csv_writer.py26
-rw-r--r--obfuscator/logger.py40
-rw-r--r--obfuscator/obfuscate.py14
-rw-r--r--obfuscator/read.py89
-rw-r--r--obfuscator/utils.py21
-rw-r--r--obfuscator/write.py28
7 files changed, 161 insertions, 154 deletions
diff --git a/obfuscator/csv_reader.py b/obfuscator/csv_reader.py
deleted file mode 100644
index 8f4ebea..0000000
--- a/obfuscator/csv_reader.py
+++ /dev/null
@@ -1,97 +0,0 @@
-import csv
-import io
-import boto3
-import os
-from typing import List, Dict
-from obfuscator.logger import get_logger
-from obfuscator.utils import get_s3_path
-
-# Create the logger
-logger = get_logger("CSVReader")
-
-# Putting the CSV reading components into a class may seem like overkill
-# for a simple script, but it allows for better organization and scalability.
-# @staticmethod is used to define the method without an instance of the class
-# being required. The methods could be defined just as functions, and this
-# may still be changed.
-
-
-class CSVReader:
- """
- A class to read CSV data from a local file, S3 object, or string. Near
- the project completion, support for JSON/Parquet files will be added.
- """
-
- @staticmethod
- def read_local(path) -> List[Dict[str, str]]:
- """
- A method to read a local CSV file and return the data as a list of
- dictionaries.
- """
- # Log the path of the file being read for debugging
- logger.debug(f"Reading local CSV from: {path}")
-
- # Attempt to read the file and return the data as a list of dictionaries
- # However, if the file isn't found or there is a generic exception, log
- # the error and raise an exception
- try:
- with open(path, mode="r", encoding="utf-8") as f:
- reader = csv.DictReader(f)
- return [dict(row) for row in reader]
- except FileNotFoundError:
- logger.error(f"File not found: {path}")
- raise
- except Exception as e:
- logger.error(f"Error reading file: {e}")
-
- @staticmethod
- def read_s3(path) -> List[Dict[str, str]]:
- """
- A method to read an S3 object containing CSV data
- and return the data as a list of dictionaries.
- """
- bucket, key = get_s3_path(path)
- logger.debug(f"Reading S3 CSV from: {bucket}/{key}")
-
- # If DEBUG=TRUE, use the localstack endpoint for testing
- if os.getenv("DEBUG", "FALSE").upper() == "TRUE":
- localstack_endpoint = "http://localhost.localstack.cloud:4566"
- logger.debug("Using LocalStack endpoint for S3")
- client = boto3.client(
- "s3",
- endpoint_url=localstack_endpoint,
- aws_access_key_id="dummy",
- aws_secret_access_key="dummy",
- )
- logger.debug(f"endpoint_url: {localstack_endpoint}")
- else:
- client = boto3.client("s3")
-
- try:
- # Attempt to read the S3 object and return the data as a list of dictionaries
- response = client.get_object(Bucket=bucket, Key=key)
- logger.info("S3 object read successfully")
- # Read and decode the content
- content = response["Body"].read().decode("utf-8")
- # Even though the read_string method was only created for testing,
- # it can be reused here to read and return the CSV data
- return CSVReader.read_string(content)
- # TODO: Add more specific exceptions to catch
- except Exception as e:
- logger.error(f"Error reading S3 object: {e}")
- raise
-
- @staticmethod
- def read_string(content: str) -> List[Dict[str, str]]:
- """
- A method to read CSV data from a string and return the data as a list
- of dictionaries.
- """
- # If the content is empty, return an empty list
- if not content.strip():
- return []
-
- # Treat the string as a file-like object and return as list of dictionaries
- f = io.StringIO(content)
- reader = csv.DictReader(f)
- return [dict(row) for row in reader]
diff --git a/obfuscator/csv_writer.py b/obfuscator/csv_writer.py
deleted file mode 100644
index aa5ac3f..0000000
--- a/obfuscator/csv_writer.py
+++ /dev/null
@@ -1,26 +0,0 @@
-import csv
-import io
-from typing import List, Dict
-from obfuscator.logger import get_logger
-
-# Create the logger
-logger = get_logger("CSVWriter")
-
-
-def create_byte_stream(data: List[Dict[str, str]]) -> bytes:
- if not data:
- logger.info("No valid data was provided to write")
- return b""
-
- output = io.StringIO()
-
- headers = list(data[0].keys())
-
- writer = csv.DictWriter(output, fieldnames=headers)
- writer.writeheader()
- writer.writerows(data)
-
- csv_string = output.getvalue()
- logger.debug(f"CSV data: {csv_string}")
-
- return csv_string.encode("utf-8")
diff --git a/obfuscator/logger.py b/obfuscator/logger.py
index ca41e95..140fa8f 100644
--- a/obfuscator/logger.py
+++ b/obfuscator/logger.py
@@ -1,24 +1,36 @@
import logging
import os
+from enum import Enum
-def get_logger(name: str) -> logging.Logger:
- logger = logging.getLogger(name)
+class LogLevel(Enum):
+ DEBUG = logging.DEBUG
+ INFO = logging.INFO
+ WARNING = logging.WARNING
+ ERROR = logging.ERROR
+ CRITICAL = logging.CRITICAL
+
- if not logger.hasHandlers():
- if os.getenv("DEBUG", "FALSE").upper() == "TRUE":
- log_level = logging.DEBUG
- else:
- log_level = logging.INFO
+def get_logger(name: str, level: LogLevel = LogLevel.INFO) -> logging.Logger:
+ if isinstance(level, str):
+ try:
+ level = LogLevel[level.upper()]
+ except KeyError:
+ raise ValueError(
+ f"Invalid log level '{level}'. Choose from: {', '.join(l.name for l in LogLevel)}"
+ )
- logger.setLevel(log_level)
+ logger = logging.getLogger(name)
- handler = logging.StreamHandler()
- formatting = logging.Formatter(
- "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
- )
- handler.setFormatter(formatting)
+ if logger.hasHandlers():
+ logger.handlers.clear()
- logger.addHandler(handler)
+ handler = logging.StreamHandler()
+ logger.setLevel(level.value)
+ formatting = logging.Formatter(
+ "[%(asctime)s] - %(levelname)s::%(name)s - %(message)s"
+ )
+ handler.setFormatter(formatting)
+ logger.addHandler(handler)
return logger
diff --git a/obfuscator/obfuscate.py b/obfuscator/obfuscate.py
index 3f589cb..cd12b6d 100644
--- a/obfuscator/obfuscate.py
+++ b/obfuscator/obfuscate.py
@@ -1,8 +1,7 @@
from typing import List, Dict
from obfuscator.logger import get_logger
-# Create the logger
-logger = get_logger("Obfuscator")
+logger = get_logger("OBFUSCATE")
def obfuscate(
@@ -12,14 +11,15 @@ def obfuscate(
A function to obfuscate PII fields in a list of dictionaries, replacing
sensitive values with a string of asterisks.
"""
- # If no data is provided, log a message and return an empty list
if not data:
- logger.info("No valid data was provided to obfuscate")
+ logger.error(
+ "Invalid or empty data was provided to obfuscate. Returning empty list."
+ )
return []
+ if not pii_fields:
+ logger.error("No PII fields provided to obfuscate. Returning data unchanged.")
+ return data
- # Obfuscate the PII fields in each record using a list/dict comprehension
- # This code is good but makes debugging a bit tricky. I may consider
- # breaking it down into a for loop.
return [
{k: ("***" if k in pii_fields else v) for k, v in record.items()}
for record in data
diff --git a/obfuscator/read.py b/obfuscator/read.py
new file mode 100644
index 0000000..b704643
--- /dev/null
+++ b/obfuscator/read.py
@@ -0,0 +1,89 @@
+import csv
+import io
+import boto3
+import os
+from typing import List, Dict
+from obfuscator.logger import get_logger
+from obfuscator.utils import Utilities
+
+
+class DataReader:
+ """
+ A class to read CSV data from a local file, S3 object, or string. Near
+ the project completion, support for JSON/Parquet files will be added.
+ """
+
+ def __init__(self, log_level=None):
+ self.log_level = log_level
+ self.logger = get_logger("CSVREADER", log_level)
+
+ def read_local(self, path) -> List[Dict[str, str]]:
+ """
+ A method to read a local CSV file and return the data as a list of
+ dictionaries.
+ """
+ self.logger.debug(f"Reading local CSV from: {path}")
+
+ try:
+ with open(path, mode="r", encoding="utf-8") as f:
+ reader = csv.DictReader(f)
+ return [dict(row) for row in reader]
+ except FileNotFoundError:
+ self.logger.error(f"File not found: {path}")
+ raise
+ except Exception as e:
+ self.logger.error(f"Error reading file: {e}")
+
+ def read_s3(self, path) -> List[Dict[str, str]]:
+ """
+ A method to read an S3 object containing CSV data
+ and return the data as a list of dictionaries.
+ """
+ utils = Utilities(self.log_level)
+ bucket, key = utils.get_s3_path(path)
+ self.logger.debug(f"Reading S3 CSV from: {bucket}/{key}")
+
+ if os.getenv("LOCALSTACK", "FALSE").upper() == "TRUE":
+ localstack_endpoint = "http://localhost.localstack.cloud:4566"
+ self.logger.debug(
+ "Using LocalStack endpoint for S3 - ensure LocalStack is running"
+ )
+ client = boto3.client(
+ "s3",
+ endpoint_url=localstack_endpoint,
+ aws_access_key_id="dummy",
+ aws_secret_access_key="dummy",
+ )
+ self.logger.debug(f"endpoint_url: {localstack_endpoint}")
+ else:
+ client = boto3.client("s3")
+
+ try:
+ response = client.get_object(Bucket=bucket, Key=key)
+ self.logger.info("S3 object read successfully")
+ content = response["Body"].read().decode("utf-8")
+ return self.read_string(content)
+ except client.exceptions.NoSuchKey:
+ self.logger.error(f"Object not found: {bucket}/{key}")
+ raise
+ except client.exceptions.ClientError as e:
+ self.logger.error(f"Error reading S3 object: {e}")
+ raise
+ except UnicodeDecodeError as e:
+ self.logger.error(f"Error decoding S3 object: {e}")
+ raise
+ except Exception as e:
+ self.logger.error(f"Error reading S3 object: {e}")
+ raise
+
+ def read_string(self, content: str) -> List[Dict[str, str]]:
+ """
+ A method to read CSV data from a string and return the data as a list
+ of dictionaries.
+ """
+ if not content.strip():
+ return []
+
+ f = io.StringIO(content)
+ reader = csv.DictReader(f)
+ return [dict(row) for row in reader]
diff --git a/obfuscator/utils.py b/obfuscator/utils.py
index 2e4211f..77ca1cf 100644
--- a/obfuscator/utils.py
+++ b/obfuscator/utils.py
@@ -1,15 +1,16 @@
# Utility functions
from obfuscator.logger import get_logger
-# Create the logger
-logger = get_logger("CLI")
+class Utilities:
+ def __init__(self, logger=None):
+ self.logger = get_logger("UTILITIES", logger)
-def get_s3_path(uri):
- parts = uri.replace("s3://", "").split("/")
- logger.debug(f"Parts: {parts}")
- bucket = parts.pop(0)
- logger.debug(f"Bucket: {bucket}")
- key = "/".join(parts)
- logger.debug(f"Key: {key}")
- return bucket, key
+ def get_s3_path(self, uri):
+ parts = uri.replace("s3://", "").split("/")
+ self.logger.debug(f"Parts: {parts}")
+ bucket = parts.pop(0)
+ self.logger.debug(f"Bucket: {bucket}")
+ key = "/".join(parts)
+ self.logger.debug(f"Key: {key}")
+ return bucket, key
diff --git a/obfuscator/write.py b/obfuscator/write.py
new file mode 100644
index 0000000..451b073
--- /dev/null
+++ b/obfuscator/write.py
@@ -0,0 +1,28 @@
+import csv
+import io
+from typing import List, Dict
+from obfuscator.logger import get_logger
+
+logger = get_logger("CSVWRITER")
+
+
+class DataWriter:
+ def __init__(self):
+ pass
+
+ def create_byte_stream(self, data: List[Dict[str, str]]) -> bytes:
+ if not data:
+ logger.error("Invalid or empty data was provided to write")
+ return b""
+
+ output = io.StringIO()
+
+ headers = list(data[0].keys())
+
+ writer = csv.DictWriter(output, fieldnames=headers)
+ writer.writeheader()
+ writer.writerows(data)
+
+ csv_string = output.getvalue()
+
+ return csv_string.encode("utf-8")
git.ajschof.me — hosted by ajschofield — powered by cgit