blob: fe0ef42e2b566203b651948ab64f9ab0a8687bfc (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
import csv
import io
import logging
import Enum
from typing import List, Dict
class Utilities:
class LogLevel(Enum):
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
CRITICAL = logging.CRITICAL
@staticmethod
def get_logger(name: str, level: "Utilities.LogLevel" = None) -> logging.Logger:
level = level or Utilities.LogLevel.INFO
logger = logging.getLogger(name)
if logger.hasHandlers():
logger.handlers.clear()
handler = logging.StreamHandler()
logger.setLevel(level.value)
formatter = logging.Formatter(
"[%(asctime)s] - %(levelname)s::%(name)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def __init__(self, logger=None):
self.logger = self.get_logger(__name__, logger)
def get_s3_path(self, uri):
parts = uri.replace("s3://", "").split("/")
self.logger.debug(f"Parts: {parts}")
bucket = parts.pop(0)
self.logger.debug(f"Bucket: {bucket}")
key = "/".join(parts)
self.logger.debug(f"Key: {key}")
return bucket, key
def create_byte_stream(self, data: List[Dict[str, str]]) -> bytes:
if not data:
self.logger.error("Invalid or empty data was provided to write")
return b""
output = io.StringIO()
headers = list(data[0].keys())
writer = csv.DictWriter(output, fieldnames=headers)
writer.writeheader()
writer.writerows(data)
csv_string = output.getvalue()
return csv_string.encode("utf-8")
|