import csv import io from dataclasses import dataclass from pathlib import Path from typing import Sequence, TextIO from app.constants import VENDOR_FIELDS @dataclass(slots=True) class ExportResult: columns: list[str] output_format: str preview_text: str def preview(self, _record_limit: int) -> str: """Return the preview that was collected during export writing.""" return self.preview_text def write_export( file_path: Path, records: Sequence[dict[str, str]], union_keys: list[str], mode: str, output_format: str, preview_record_limit: int, ) -> ExportResult: """Write the final export directly to disk and keep only a small preview in memory.""" columns = VENDOR_FIELDS if mode == "vendor" else union_keys with file_path.open("w", encoding="utf-8", newline="") as export_file: if output_format == "text": preview_text = _write_text( export_file=export_file, records=records, columns=columns, preview_record_limit=preview_record_limit, ) else: preview_text = _write_csv( export_file=export_file, records=records, columns=columns, preview_record_limit=preview_record_limit, ) return ExportResult( columns=columns, output_format=output_format, preview_text=preview_text, ) def _write_text( export_file: TextIO, records: Sequence[dict[str, str]], columns: list[str], preview_record_limit: int, ) -> str: max_key_length = max((len(column) for column in columns), default=0) preview_lines: list[str] = [] wrote_line = False for index, record in enumerate(records, start=1): header = f"--- record {index} ---" wrote_line = _write_line(export_file, header, wrote_line) if index <= preview_record_limit: preview_lines.append(header) for column in columns: line = f" {column.ljust(max_key_length)} = {record.get(column, '')}" wrote_line = _write_line(export_file, line, wrote_line) if index <= preview_record_limit: preview_lines.append(line) return "\n".join(preview_lines) def _write_csv( export_file: TextIO, records: Sequence[dict[str, str]], columns: list[str], preview_record_limit: int, ) -> str: writer = csv.DictWriter(export_file, fieldnames=columns, extrasaction="ignore") writer.writeheader() preview_buffer = io.StringIO() preview_writer = csv.DictWriter(preview_buffer, fieldnames=columns, extrasaction="ignore") preview_writer.writeheader() for index, record in enumerate(records, start=1): row = {column: record.get(column, "") for column in columns} writer.writerow(row) if index <= preview_record_limit: preview_writer.writerow(row) return preview_buffer.getvalue().rstrip("\n") def _write_line(export_file: TextIO, line: str, wrote_line: bool) -> bool: """Write lines without leaving a trailing newline at the end of the file.""" if wrote_line: export_file.write("\n") export_file.write(line) return True