108 lines
3.2 KiB
Python
108 lines
3.2 KiB
Python
import csv
|
|
import io
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Sequence, TextIO
|
|
|
|
from app.constants import VENDOR_FIELDS
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ExportResult:
|
|
columns: list[str]
|
|
output_format: str
|
|
preview_text: str
|
|
|
|
def preview(self, _record_limit: int) -> str:
|
|
"""Return the preview that was collected during export writing."""
|
|
return self.preview_text
|
|
|
|
|
|
def write_export(
|
|
file_path: Path,
|
|
records: Sequence[dict[str, str]],
|
|
union_keys: list[str],
|
|
mode: str,
|
|
output_format: str,
|
|
preview_record_limit: int,
|
|
) -> ExportResult:
|
|
"""Write the final export directly to disk and keep only a small preview in memory."""
|
|
columns = VENDOR_FIELDS if mode == "vendor" else union_keys
|
|
|
|
with file_path.open("w", encoding="utf-8", newline="") as export_file:
|
|
if output_format == "text":
|
|
preview_text = _write_text(
|
|
export_file=export_file,
|
|
records=records,
|
|
columns=columns,
|
|
preview_record_limit=preview_record_limit,
|
|
)
|
|
else:
|
|
preview_text = _write_csv(
|
|
export_file=export_file,
|
|
records=records,
|
|
columns=columns,
|
|
preview_record_limit=preview_record_limit,
|
|
)
|
|
|
|
return ExportResult(
|
|
columns=columns,
|
|
output_format=output_format,
|
|
preview_text=preview_text,
|
|
)
|
|
|
|
|
|
def _write_text(
|
|
export_file: TextIO,
|
|
records: Sequence[dict[str, str]],
|
|
columns: list[str],
|
|
preview_record_limit: int,
|
|
) -> str:
|
|
max_key_length = max((len(column) for column in columns), default=0)
|
|
preview_lines: list[str] = []
|
|
wrote_line = False
|
|
|
|
for index, record in enumerate(records, start=1):
|
|
header = f"--- record {index} ---"
|
|
wrote_line = _write_line(export_file, header, wrote_line)
|
|
if index <= preview_record_limit:
|
|
preview_lines.append(header)
|
|
|
|
for column in columns:
|
|
line = f" {column.ljust(max_key_length)} = {record.get(column, '')}"
|
|
wrote_line = _write_line(export_file, line, wrote_line)
|
|
if index <= preview_record_limit:
|
|
preview_lines.append(line)
|
|
|
|
return "\n".join(preview_lines)
|
|
|
|
|
|
def _write_csv(
|
|
export_file: TextIO,
|
|
records: Sequence[dict[str, str]],
|
|
columns: list[str],
|
|
preview_record_limit: int,
|
|
) -> str:
|
|
writer = csv.DictWriter(export_file, fieldnames=columns, extrasaction="ignore")
|
|
writer.writeheader()
|
|
|
|
preview_buffer = io.StringIO()
|
|
preview_writer = csv.DictWriter(preview_buffer, fieldnames=columns, extrasaction="ignore")
|
|
preview_writer.writeheader()
|
|
|
|
for index, record in enumerate(records, start=1):
|
|
row = {column: record.get(column, "") for column in columns}
|
|
writer.writerow(row)
|
|
if index <= preview_record_limit:
|
|
preview_writer.writerow(row)
|
|
|
|
return preview_buffer.getvalue().rstrip("\n")
|
|
|
|
|
|
def _write_line(export_file: TextIO, line: str, wrote_line: bool) -> bool:
|
|
"""Write lines without leaving a trailing newline at the end of the file."""
|
|
if wrote_line:
|
|
export_file.write("\n")
|
|
export_file.write(line)
|
|
return True
|