from django.contrib import admin from django.contrib import messages from django.db.models import Count from apps.providers.registry import get_default_provider_namespace from .models import IngestionError, IngestionRun from .tasks import trigger_full_sync, trigger_incremental_sync class IngestionErrorInline(admin.TabularInline): model = IngestionError extra = 0 readonly_fields = ("provider_namespace", "entity_type", "external_id", "severity", "message", "occurred_at") @admin.register(IngestionRun) class IngestionRunAdmin(admin.ModelAdmin): list_display = ( "provider_namespace", "job_type", "status", "records_processed", "records_created", "records_updated", "records_failed", "error_count", "short_error_summary", "started_at", "finished_at", ) list_filter = ("provider_namespace", "job_type", "status") search_fields = ("provider_namespace",) inlines = (IngestionErrorInline,) readonly_fields = ( "provider_namespace", "job_type", "status", "triggered_by", "started_at", "finished_at", "records_processed", "records_created", "records_updated", "records_failed", "error_summary", "context", "raw_payload", "created_at", ) actions = ( "enqueue_full_sync_default_provider", "enqueue_incremental_sync_default_provider", "retry_selected_runs", ) @admin.action(description="Queue full sync (default provider)") def enqueue_full_sync_default_provider(self, request, queryset): provider_namespace = get_default_provider_namespace() trigger_full_sync.delay(provider_namespace=provider_namespace, triggered_by_id=request.user.id) self.message_user(request, f"Queued full sync task for {provider_namespace}.", level=messages.SUCCESS) @admin.action(description="Queue incremental sync (default provider)") def enqueue_incremental_sync_default_provider(self, request, queryset): provider_namespace = get_default_provider_namespace() trigger_incremental_sync.delay(provider_namespace=provider_namespace, triggered_by_id=request.user.id) self.message_user(request, f"Queued incremental sync task for {provider_namespace}.", level=messages.SUCCESS) @admin.action(description="Retry selected ingestion runs") def retry_selected_runs(self, request, queryset): count = 0 for run in queryset: if run.job_type == IngestionRun.JobType.INCREMENTAL: trigger_incremental_sync.delay( provider_namespace=run.provider_namespace, triggered_by_id=request.user.id, context={"retry_of": run.id}, ) else: trigger_full_sync.delay( provider_namespace=run.provider_namespace, triggered_by_id=request.user.id, context={"retry_of": run.id}, ) count += 1 self.message_user(request, f"Queued {count} retry task(s).", level=messages.SUCCESS) def get_queryset(self, request): queryset = super().get_queryset(request) return queryset.annotate(_error_count=Count("errors")) @admin.display(ordering="_error_count", description="Errors") def error_count(self, obj): return getattr(obj, "_error_count", 0) @admin.display(description="Error summary") def short_error_summary(self, obj): if not obj.error_summary: return "-" return (obj.error_summary[:90] + "...") if len(obj.error_summary) > 90 else obj.error_summary @admin.register(IngestionError) class IngestionErrorAdmin(admin.ModelAdmin): list_display = ("provider_namespace", "entity_type", "external_id", "severity", "occurred_at") list_filter = ("severity", "provider_namespace") search_fields = ("entity_type", "external_id", "message") readonly_fields = ( "ingestion_run", "provider_namespace", "entity_type", "external_id", "severity", "message", "raw_payload", "occurred_at", )