Files
webfortilog/app/services/exporter.py
2026-04-27 11:44:40 +02:00

108 lines
3.2 KiB
Python

import csv
import io
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence, TextIO
from app.constants import VENDOR_FIELDS
@dataclass(slots=True)
class ExportResult:
columns: list[str]
output_format: str
preview_text: str
def preview(self, _record_limit: int) -> str:
"""Return the preview that was collected during export writing."""
return self.preview_text
def write_export(
file_path: Path,
records: Sequence[dict[str, str]],
union_keys: list[str],
mode: str,
output_format: str,
preview_record_limit: int,
) -> ExportResult:
"""Write the final export directly to disk and keep only a small preview in memory."""
columns = VENDOR_FIELDS if mode == "vendor" else union_keys
with file_path.open("w", encoding="utf-8", newline="") as export_file:
if output_format == "text":
preview_text = _write_text(
export_file=export_file,
records=records,
columns=columns,
preview_record_limit=preview_record_limit,
)
else:
preview_text = _write_csv(
export_file=export_file,
records=records,
columns=columns,
preview_record_limit=preview_record_limit,
)
return ExportResult(
columns=columns,
output_format=output_format,
preview_text=preview_text,
)
def _write_text(
export_file: TextIO,
records: Sequence[dict[str, str]],
columns: list[str],
preview_record_limit: int,
) -> str:
max_key_length = max((len(column) for column in columns), default=0)
preview_lines: list[str] = []
wrote_line = False
for index, record in enumerate(records, start=1):
header = f"--- record {index} ---"
wrote_line = _write_line(export_file, header, wrote_line)
if index <= preview_record_limit:
preview_lines.append(header)
for column in columns:
line = f" {column.ljust(max_key_length)} = {record.get(column, '')}"
wrote_line = _write_line(export_file, line, wrote_line)
if index <= preview_record_limit:
preview_lines.append(line)
return "\n".join(preview_lines)
def _write_csv(
export_file: TextIO,
records: Sequence[dict[str, str]],
columns: list[str],
preview_record_limit: int,
) -> str:
writer = csv.DictWriter(export_file, fieldnames=columns, extrasaction="ignore")
writer.writeheader()
preview_buffer = io.StringIO()
preview_writer = csv.DictWriter(preview_buffer, fieldnames=columns, extrasaction="ignore")
preview_writer.writeheader()
for index, record in enumerate(records, start=1):
row = {column: record.get(column, "") for column in columns}
writer.writerow(row)
if index <= preview_record_limit:
preview_writer.writerow(row)
return preview_buffer.getvalue().rstrip("\n")
def _write_line(export_file: TextIO, line: str, wrote_line: bool) -> bool:
"""Write lines without leaving a trailing newline at the end of the file."""
if wrote_line:
export_file.write("\n")
export_file.write(line)
return True