from django.contrib import admin from django.contrib import messages from .models import IngestionError, IngestionRun from .tasks import trigger_full_sync, trigger_incremental_sync class IngestionErrorInline(admin.TabularInline): model = IngestionError extra = 0 readonly_fields = ("provider_namespace", "entity_type", "external_id", "severity", "message", "occurred_at") @admin.register(IngestionRun) class IngestionRunAdmin(admin.ModelAdmin): list_display = ( "provider_namespace", "job_type", "status", "records_processed", "records_failed", "started_at", "finished_at", ) list_filter = ("provider_namespace", "job_type", "status") search_fields = ("provider_namespace",) inlines = (IngestionErrorInline,) readonly_fields = ( "provider_namespace", "job_type", "status", "triggered_by", "started_at", "finished_at", "records_processed", "records_created", "records_updated", "records_failed", "context", "raw_payload", "created_at", ) actions = ( "enqueue_full_sync_mvp", "enqueue_incremental_sync_mvp", "retry_selected_runs", ) @admin.action(description="Queue full MVP sync") def enqueue_full_sync_mvp(self, request, queryset): trigger_full_sync.delay(provider_namespace="mvp_demo", triggered_by_id=request.user.id) self.message_user(request, "Queued full MVP sync task.", level=messages.SUCCESS) @admin.action(description="Queue incremental MVP sync") def enqueue_incremental_sync_mvp(self, request, queryset): trigger_incremental_sync.delay(provider_namespace="mvp_demo", triggered_by_id=request.user.id) self.message_user(request, "Queued incremental MVP sync task.", level=messages.SUCCESS) @admin.action(description="Retry selected ingestion runs") def retry_selected_runs(self, request, queryset): count = 0 for run in queryset: if run.job_type == IngestionRun.JobType.INCREMENTAL: trigger_incremental_sync.delay( provider_namespace=run.provider_namespace, triggered_by_id=request.user.id, context={"retry_of": run.id}, ) else: trigger_full_sync.delay( provider_namespace=run.provider_namespace, triggered_by_id=request.user.id, context={"retry_of": run.id}, ) count += 1 self.message_user(request, f"Queued {count} retry task(s).", level=messages.SUCCESS) @admin.register(IngestionError) class IngestionErrorAdmin(admin.ModelAdmin): list_display = ("provider_namespace", "entity_type", "external_id", "severity", "occurred_at") list_filter = ("severity", "provider_namespace") search_fields = ("entity_type", "external_id", "message") readonly_fields = ( "ingestion_run", "provider_namespace", "entity_type", "external_id", "severity", "message", "raw_payload", "occurred_at", )