diff options
| author | Alex <git@ajschof.me> | 2025-02-19 15:58:28 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-02-19 15:58:28 +0000 |
| commit | 4066bf747e1e4c938526957c119f3f1485ee251e (patch) | |
| tree | 9a1e95f4ccbdd04e19d67a6c13641a19c4d4f3e0 /obfuscator | |
| parent | f24955044c4c05e37aba4efb505ec63b44113912 (diff) | |
| parent | 5402af2c7198a685a57a05e29a869e1e72a6b877 (diff) | |
| download | gdpr-obfuscator-4066bf747e1e4c938526957c119f3f1485ee251e.tar.gz gdpr-obfuscator-4066bf747e1e4c938526957c119f3f1485ee251e.zip | |
Merge pull request #8 from ajschofield/refining-phase
mostly minor changes (fixing things up)
Diffstat (limited to 'obfuscator')
| -rw-r--r-- | obfuscator/csv_reader.py | 97 | ||||
| -rw-r--r-- | obfuscator/csv_writer.py | 26 | ||||
| -rw-r--r-- | obfuscator/logger.py | 40 | ||||
| -rw-r--r-- | obfuscator/obfuscate.py | 14 | ||||
| -rw-r--r-- | obfuscator/read.py | 89 | ||||
| -rw-r--r-- | obfuscator/utils.py | 21 | ||||
| -rw-r--r-- | obfuscator/write.py | 28 |
7 files changed, 161 insertions, 154 deletions
diff --git a/obfuscator/csv_reader.py b/obfuscator/csv_reader.py deleted file mode 100644 index 8f4ebea..0000000 --- a/obfuscator/csv_reader.py +++ /dev/null @@ -1,97 +0,0 @@ -import csv -import io -import boto3 -import os -from typing import List, Dict -from obfuscator.logger import get_logger -from obfuscator.utils import get_s3_path - -# Create the logger -logger = get_logger("CSVReader") - -# Putting the CSV reading components into a class may seem like overkill -# for a simple script, but it allows for better organization and scalability. -# @staticmethod is used to define the method without an instance of the class -# being required. The methods could be defined just as functions, and this -# may still be changed. - - -class CSVReader: - """ - A class to read CSV data from a local file, S3 object, or string. Near - the project completion, support for JSON/Parquet files will be added. - """ - - @staticmethod - def read_local(path) -> List[Dict[str, str]]: - """ - A method to read a local CSV file and return the data as a list of - dictionaries. - """ - # Log the path of the file being read for debugging - logger.debug(f"Reading local CSV from: {path}") - - # Attempt to read the file and return the data as a list of dictionaries - # However, if the file isn't found or there is a generic exception, log - # the error and raise an exception - try: - with open(path, mode="r", encoding="utf-8") as f: - reader = csv.DictReader(f) - return [dict(row) for row in reader] - except FileNotFoundError: - logger.error(f"File not found: {path}") - raise - except Exception as e: - logger.error(f"Error reading file: {e}") - - @staticmethod - def read_s3(path) -> List[Dict[str, str]]: - """ - A method to read an S3 object containing CSV data - and return the data as a list of dictionaries. - """ - bucket, key = get_s3_path(path) - logger.debug(f"Reading S3 CSV from: {bucket}/{key}") - - # If DEBUG=TRUE, use the localstack endpoint for testing - if os.getenv("DEBUG", "FALSE").upper() == "TRUE": - localstack_endpoint = "http://localhost.localstack.cloud:4566" - logger.debug("Using LocalStack endpoint for S3") - client = boto3.client( - "s3", - endpoint_url=localstack_endpoint, - aws_access_key_id="dummy", - aws_secret_access_key="dummy", - ) - logger.debug(f"endpoint_url: {localstack_endpoint}") - else: - client = boto3.client("s3") - - try: - # Attempt to read the S3 object and return the data as a list of dictionaries - response = client.get_object(Bucket=bucket, Key=key) - logger.info("S3 object read successfully") - # Read and decode the content - content = response["Body"].read().decode("utf-8") - # Even though the read_string method was only created for testing, - # it can be reused here to read and return the CSV data - return CSVReader.read_string(content) - # TODO: Add more specific exceptions to catch - except Exception as e: - logger.error(f"Error reading S3 object: {e}") - raise - - @staticmethod - def read_string(content: str) -> List[Dict[str, str]]: - """ - A method to read CSV data from a string and return the data as a list - of dictionaries. - """ - # If the content is empty, return an empty list - if not content.strip(): - return [] - - # Treat the string as a file-like object and return as list of dictionaries - f = io.StringIO(content) - reader = csv.DictReader(f) - return [dict(row) for row in reader] diff --git a/obfuscator/csv_writer.py b/obfuscator/csv_writer.py deleted file mode 100644 index aa5ac3f..0000000 --- a/obfuscator/csv_writer.py +++ /dev/null @@ -1,26 +0,0 @@ -import csv -import io -from typing import List, Dict -from obfuscator.logger import get_logger - -# Create the logger -logger = get_logger("CSVWriter") - - -def create_byte_stream(data: List[Dict[str, str]]) -> bytes: - if not data: - logger.info("No valid data was provided to write") - return b"" - - output = io.StringIO() - - headers = list(data[0].keys()) - - writer = csv.DictWriter(output, fieldnames=headers) - writer.writeheader() - writer.writerows(data) - - csv_string = output.getvalue() - logger.debug(f"CSV data: {csv_string}") - - return csv_string.encode("utf-8") diff --git a/obfuscator/logger.py b/obfuscator/logger.py index ca41e95..140fa8f 100644 --- a/obfuscator/logger.py +++ b/obfuscator/logger.py @@ -1,24 +1,36 @@ import logging import os +from enum import Enum -def get_logger(name: str) -> logging.Logger: - logger = logging.getLogger(name) +class LogLevel(Enum): + DEBUG = logging.DEBUG + INFO = logging.INFO + WARNING = logging.WARNING + ERROR = logging.ERROR + CRITICAL = logging.CRITICAL + - if not logger.hasHandlers(): - if os.getenv("DEBUG", "FALSE").upper() == "TRUE": - log_level = logging.DEBUG - else: - log_level = logging.INFO +def get_logger(name: str, level: LogLevel = LogLevel.INFO) -> logging.Logger: + if isinstance(level, str): + try: + level = LogLevel[level.upper()] + except KeyError: + raise ValueError( + f"Invalid log level '{level}'. Choose from: {', '.join(l.name for l in LogLevel)}" + ) - logger.setLevel(log_level) + logger = logging.getLogger(name) - handler = logging.StreamHandler() - formatting = logging.Formatter( - "%(asctime)s - %(levelname)s - %(name)s - %(message)s" - ) - handler.setFormatter(formatting) + if logger.hasHandlers(): + logger.handlers.clear() - logger.addHandler(handler) + handler = logging.StreamHandler() + logger.setLevel(level.value) + formatting = logging.Formatter( + "[%(asctime)s] - %(levelname)s::%(name)s - %(message)s" + ) + handler.setFormatter(formatting) + logger.addHandler(handler) return logger diff --git a/obfuscator/obfuscate.py b/obfuscator/obfuscate.py index 3f589cb..cd12b6d 100644 --- a/obfuscator/obfuscate.py +++ b/obfuscator/obfuscate.py @@ -1,8 +1,7 @@ from typing import List, Dict from obfuscator.logger import get_logger -# Create the logger -logger = get_logger("Obfuscator") +logger = get_logger("OBFUSCATE") def obfuscate( @@ -12,14 +11,15 @@ def obfuscate( A function to obfuscate PII fields in a list of dictionaries, replacing sensitive values with a string of asterisks. """ - # If no data is provided, log a message and return an empty list if not data: - logger.info("No valid data was provided to obfuscate") + logger.error( + "Invalid or empty data was provided to obfuscate. Returning empty list." + ) return [] + if not pii_fields: + logger.error("No PII fields provided to obfuscate. Returning data unchanged.") + return data - # Obfuscate the PII fields in each record using a list/dict comprehension - # This code is good but makes debugging a bit tricky. I may consider - # breaking it down into a for loop. return [ {k: ("***" if k in pii_fields else v) for k, v in record.items()} for record in data diff --git a/obfuscator/read.py b/obfuscator/read.py new file mode 100644 index 0000000..b704643 --- /dev/null +++ b/obfuscator/read.py @@ -0,0 +1,89 @@ +import csv +import io +import boto3 +import os +from typing import List, Dict +from obfuscator.logger import get_logger +from obfuscator.utils import Utilities + + +class DataReader: + """ + A class to read CSV data from a local file, S3 object, or string. Near + the project completion, support for JSON/Parquet files will be added. + """ + + def __init__(self, log_level=None): + self.log_level = log_level + self.logger = get_logger("CSVREADER", log_level) + + def read_local(self, path) -> List[Dict[str, str]]: + """ + A method to read a local CSV file and return the data as a list of + dictionaries. + """ + self.logger.debug(f"Reading local CSV from: {path}") + + try: + with open(path, mode="r", encoding="utf-8") as f: + reader = csv.DictReader(f) + return [dict(row) for row in reader] + except FileNotFoundError: + self.logger.error(f"File not found: {path}") + raise + except Exception as e: + self.logger.error(f"Error reading file: {e}") + + def read_s3(self, path) -> List[Dict[str, str]]: + """ + A method to read an S3 object containing CSV data + and return the data as a list of dictionaries. + """ + utils = Utilities(self.log_level) + bucket, key = utils.get_s3_path(path) + self.logger.debug(f"Reading S3 CSV from: {bucket}/{key}") + + if os.getenv("LOCALSTACK", "FALSE").upper() == "TRUE": + localstack_endpoint = "http://localhost.localstack.cloud:4566" + self.logger.debug( + "Using LocalStack endpoint for S3 - ensure LocalStack is running" + ) + client = boto3.client( + "s3", + endpoint_url=localstack_endpoint, + aws_access_key_id="dummy", + aws_secret_access_key="dummy", + ) + self.logger.debug(f"endpoint_url: {localstack_endpoint}") + else: + client = boto3.client("s3") + + try: + response = client.get_object(Bucket=bucket, Key=key) + self.logger.info("S3 object read successfully") + content = response["Body"].read().decode("utf-8") + return self.read_string(content) + except client.exceptions.NoSuchKey: + self.logger.error(f"Object not found: {bucket}/{key}") + raise + except client.exceptions.ClientError as e: + self.logger.error(f"Error reading S3 object: {e}") + raise + except UnicodeDecodeError as e: + self.logger.error(f"Error decoding S3 object: {e}") + raise + except Exception as e: + self.logger.error(f"Error reading S3 object: {e}") + raise + + def read_string(self, content: str) -> List[Dict[str, str]]: + """ + A method to read CSV data from a string and return the data as a list + of dictionaries. + """ + if not content.strip(): + return [] + + f = io.StringIO(content) + reader = csv.DictReader(f) + return [dict(row) for row in reader] diff --git a/obfuscator/utils.py b/obfuscator/utils.py index 2e4211f..77ca1cf 100644 --- a/obfuscator/utils.py +++ b/obfuscator/utils.py @@ -1,15 +1,16 @@ # Utility functions from obfuscator.logger import get_logger -# Create the logger -logger = get_logger("CLI") +class Utilities: + def __init__(self, logger=None): + self.logger = get_logger("UTILITIES", logger) -def get_s3_path(uri): - parts = uri.replace("s3://", "").split("/") - logger.debug(f"Parts: {parts}") - bucket = parts.pop(0) - logger.debug(f"Bucket: {bucket}") - key = "/".join(parts) - logger.debug(f"Key: {key}") - return bucket, key + def get_s3_path(self, uri): + parts = uri.replace("s3://", "").split("/") + self.logger.debug(f"Parts: {parts}") + bucket = parts.pop(0) + self.logger.debug(f"Bucket: {bucket}") + key = "/".join(parts) + self.logger.debug(f"Key: {key}") + return bucket, key diff --git a/obfuscator/write.py b/obfuscator/write.py new file mode 100644 index 0000000..451b073 --- /dev/null +++ b/obfuscator/write.py @@ -0,0 +1,28 @@ +import csv +import io +from typing import List, Dict +from obfuscator.logger import get_logger + +logger = get_logger("CSVWRITER") + + +class DataWriter: + def __init__(self): + pass + + def create_byte_stream(self, data: List[Dict[str, str]]) -> bytes: + if not data: + logger.error("Invalid or empty data was provided to write") + return b"" + + output = io.StringIO() + + headers = list(data[0].keys()) + + writer = csv.DictWriter(output, fieldnames=headers) + writer.writeheader() + writer.writerows(data) + + csv_string = output.getvalue() + + return csv_string.encode("utf-8") |
