feat(v2): add LBA snapshot extractor and command
This commit is contained in:
61
apps/ingestion/management/commands/run_lba_extractor.py
Normal file
61
apps/ingestion/management/commands/run_lba_extractor.py
Normal file
@ -0,0 +1,61 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.utils.dateparse import parse_date
|
||||
|
||||
from apps.ingestion.extractors import ExtractorError, create_extractor
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Run only the LBA extractor and emit an import-ready snapshot JSON."
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
"--output-path",
|
||||
dest="output_path",
|
||||
default=None,
|
||||
help="Directory or .json path to write output (default incoming dir).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--snapshot-date",
|
||||
dest="snapshot_date",
|
||||
default=None,
|
||||
help="Override snapshot date in YYYY-MM-DD format.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Validate without writing output.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--indent",
|
||||
type=int,
|
||||
default=2,
|
||||
help="JSON indent level for emitted file.",
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
snapshot_date = None
|
||||
if options["snapshot_date"]:
|
||||
snapshot_date = parse_date(options["snapshot_date"])
|
||||
if snapshot_date is None:
|
||||
raise CommandError("--snapshot-date must be YYYY-MM-DD.")
|
||||
|
||||
try:
|
||||
extractor = create_extractor("lba")
|
||||
result = extractor.run(
|
||||
output_path=options["output_path"],
|
||||
snapshot_date=snapshot_date,
|
||||
write_output=not options["dry_run"],
|
||||
indent=options["indent"],
|
||||
)
|
||||
except ExtractorError as exc:
|
||||
raise CommandError(str(exc)) from exc
|
||||
|
||||
output = str(result.output_path) if result.output_path else "<dry-run>"
|
||||
self.stdout.write(
|
||||
self.style.SUCCESS(
|
||||
f"LBA extractor completed: source={result.source_name} "
|
||||
f"date={result.snapshot_date} records={result.records_count} output={output}"
|
||||
)
|
||||
)
|
||||
Reference in New Issue
Block a user