blob: 1fb1e300b36be873a9c4c434e91231a148aa4305 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
import csv
import io
from typing import List, Dict
from obfuscator.logger import get_logger
logger = get_logger("CSVReader")
class CSVReader:
def __init__(self):
pass
@staticmethod
def read_local(path) -> List[Dict[str, str]]:
logger.debug(f"Reading local CSV from: {path}")
data = []
try:
with open(path, mode="r", encoding="utf-8") as file:
reader = csv.DictReader(file)
for row in reader:
data.append(dict(row))
except FileNotFoundError:
logger.error(f"File not found: {path}")
except Exception as e:
logger.error(f"Error reading file: {e}")
logger.debug(f"Total rows read: {len(data)}")
return data
@staticmethod
def read_s3(path) -> List[Dict[str, str]]:
return []
@staticmethod
def read_string(self, content: str) -> List[Dict[str, str]]:
if not content.strip():
return []
f = io.StringIO(content)
reader = csv.DictReader(f)
return [dict(row) for row in reader]
|