131 lines
4.0 KiB
Python
131 lines
4.0 KiB
Python
import hashlib
|
|
from contextlib import contextmanager
|
|
|
|
from django.db import connection
|
|
from django.utils import timezone
|
|
|
|
from apps.ingestion.models import IngestionError, IngestionRun
|
|
|
|
|
|
def start_ingestion_run(*, provider_namespace: str, job_type: str, triggered_by=None, context: dict | None = None) -> IngestionRun:
|
|
return IngestionRun.objects.create(
|
|
provider_namespace=provider_namespace,
|
|
job_type=job_type,
|
|
status=IngestionRun.RunStatus.RUNNING,
|
|
triggered_by=triggered_by,
|
|
started_at=timezone.now(),
|
|
context=context or {},
|
|
)
|
|
|
|
|
|
def finish_ingestion_run(
|
|
*,
|
|
run: IngestionRun,
|
|
status: str,
|
|
processed: int = 0,
|
|
created: int = 0,
|
|
updated: int = 0,
|
|
failed: int = 0,
|
|
error_summary: str = "",
|
|
) -> IngestionRun:
|
|
run.status = status
|
|
run.records_processed = processed
|
|
run.records_created = created
|
|
run.records_updated = updated
|
|
run.records_failed = failed
|
|
run.error_summary = error_summary
|
|
run.finished_at = timezone.now()
|
|
run.save(
|
|
update_fields=[
|
|
"status",
|
|
"records_processed",
|
|
"records_created",
|
|
"records_updated",
|
|
"records_failed",
|
|
"error_summary",
|
|
"finished_at",
|
|
]
|
|
)
|
|
return run
|
|
|
|
|
|
def mark_ingestion_run_skipped(*, provider_namespace: str, job_type: str, reason: str, context: dict | None = None) -> IngestionRun:
|
|
now = timezone.now()
|
|
run = IngestionRun.objects.create(
|
|
provider_namespace=provider_namespace,
|
|
job_type=job_type,
|
|
status=IngestionRun.RunStatus.CANCELED,
|
|
started_at=now,
|
|
finished_at=now,
|
|
error_summary=reason,
|
|
context=context or {},
|
|
)
|
|
return run
|
|
|
|
|
|
def _build_ingestion_lock_key(*, provider_namespace: str, job_type: str) -> int:
|
|
digest = hashlib.blake2b(
|
|
f"{provider_namespace}:{job_type}".encode("utf-8"),
|
|
digest_size=8,
|
|
).digest()
|
|
return int.from_bytes(digest, byteorder="big", signed=True)
|
|
|
|
|
|
def try_acquire_ingestion_lock(*, provider_namespace: str, job_type: str) -> tuple[bool, int]:
|
|
lock_key = _build_ingestion_lock_key(provider_namespace=provider_namespace, job_type=job_type)
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("SELECT pg_try_advisory_lock(%s);", [lock_key])
|
|
acquired = bool(cursor.fetchone()[0])
|
|
return acquired, lock_key
|
|
|
|
|
|
def release_ingestion_lock(*, lock_key: int) -> None:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("SELECT pg_advisory_unlock(%s);", [lock_key])
|
|
|
|
|
|
@contextmanager
|
|
def ingestion_advisory_lock(*, provider_namespace: str, job_type: str):
|
|
acquired, lock_key = try_acquire_ingestion_lock(
|
|
provider_namespace=provider_namespace,
|
|
job_type=job_type,
|
|
)
|
|
try:
|
|
yield acquired
|
|
finally:
|
|
if acquired:
|
|
release_ingestion_lock(lock_key=lock_key)
|
|
|
|
|
|
def update_ingestion_run_progress(
|
|
*,
|
|
run: IngestionRun,
|
|
completed_step: str,
|
|
step_summary: dict,
|
|
source_counts: dict | None = None,
|
|
) -> IngestionRun:
|
|
context = dict(run.context or {})
|
|
completed_steps = list(context.get("completed_steps") or [])
|
|
completed_steps.append(completed_step)
|
|
context["completed_steps"] = completed_steps
|
|
step_summaries = dict(context.get("step_summaries") or {})
|
|
step_summaries[completed_step] = step_summary
|
|
context["step_summaries"] = step_summaries
|
|
if source_counts is not None:
|
|
context["source_counts"] = source_counts
|
|
run.context = context
|
|
run.save(update_fields=["context"])
|
|
return run
|
|
|
|
|
|
def log_ingestion_error(*, run: IngestionRun, message: str, provider_namespace: str, severity: str = IngestionError.Severity.ERROR, entity_type: str = "", external_id: str = "", raw_payload: dict | None = None) -> IngestionError:
|
|
return IngestionError.objects.create(
|
|
ingestion_run=run,
|
|
provider_namespace=provider_namespace,
|
|
message=message,
|
|
severity=severity,
|
|
entity_type=entity_type,
|
|
external_id=external_id,
|
|
raw_payload=raw_payload or {},
|
|
)
|