94 lines
3.1 KiB
Python
94 lines
3.1 KiB
Python
from django.contrib import admin
|
|
from django.contrib import messages
|
|
|
|
from .models import IngestionError, IngestionRun
|
|
from .tasks import trigger_full_sync, trigger_incremental_sync
|
|
|
|
|
|
class IngestionErrorInline(admin.TabularInline):
|
|
model = IngestionError
|
|
extra = 0
|
|
readonly_fields = ("provider_namespace", "entity_type", "external_id", "severity", "message", "occurred_at")
|
|
|
|
|
|
@admin.register(IngestionRun)
|
|
class IngestionRunAdmin(admin.ModelAdmin):
|
|
list_display = (
|
|
"provider_namespace",
|
|
"job_type",
|
|
"status",
|
|
"records_processed",
|
|
"records_failed",
|
|
"started_at",
|
|
"finished_at",
|
|
)
|
|
list_filter = ("provider_namespace", "job_type", "status")
|
|
search_fields = ("provider_namespace",)
|
|
inlines = (IngestionErrorInline,)
|
|
readonly_fields = (
|
|
"provider_namespace",
|
|
"job_type",
|
|
"status",
|
|
"triggered_by",
|
|
"started_at",
|
|
"finished_at",
|
|
"records_processed",
|
|
"records_created",
|
|
"records_updated",
|
|
"records_failed",
|
|
"context",
|
|
"raw_payload",
|
|
"created_at",
|
|
)
|
|
actions = (
|
|
"enqueue_full_sync_mvp",
|
|
"enqueue_incremental_sync_mvp",
|
|
"retry_selected_runs",
|
|
)
|
|
|
|
@admin.action(description="Queue full MVP sync")
|
|
def enqueue_full_sync_mvp(self, request, queryset):
|
|
trigger_full_sync.delay(provider_namespace="mvp_demo", triggered_by_id=request.user.id)
|
|
self.message_user(request, "Queued full MVP sync task.", level=messages.SUCCESS)
|
|
|
|
@admin.action(description="Queue incremental MVP sync")
|
|
def enqueue_incremental_sync_mvp(self, request, queryset):
|
|
trigger_incremental_sync.delay(provider_namespace="mvp_demo", triggered_by_id=request.user.id)
|
|
self.message_user(request, "Queued incremental MVP sync task.", level=messages.SUCCESS)
|
|
|
|
@admin.action(description="Retry selected ingestion runs")
|
|
def retry_selected_runs(self, request, queryset):
|
|
count = 0
|
|
for run in queryset:
|
|
if run.job_type == IngestionRun.JobType.INCREMENTAL:
|
|
trigger_incremental_sync.delay(
|
|
provider_namespace=run.provider_namespace,
|
|
triggered_by_id=request.user.id,
|
|
context={"retry_of": run.id},
|
|
)
|
|
else:
|
|
trigger_full_sync.delay(
|
|
provider_namespace=run.provider_namespace,
|
|
triggered_by_id=request.user.id,
|
|
context={"retry_of": run.id},
|
|
)
|
|
count += 1
|
|
self.message_user(request, f"Queued {count} retry task(s).", level=messages.SUCCESS)
|
|
|
|
|
|
@admin.register(IngestionError)
|
|
class IngestionErrorAdmin(admin.ModelAdmin):
|
|
list_display = ("provider_namespace", "entity_type", "external_id", "severity", "occurred_at")
|
|
list_filter = ("severity", "provider_namespace")
|
|
search_fields = ("entity_type", "external_id", "message")
|
|
readonly_fields = (
|
|
"ingestion_run",
|
|
"provider_namespace",
|
|
"entity_type",
|
|
"external_id",
|
|
"severity",
|
|
"message",
|
|
"raw_payload",
|
|
"occurred_at",
|
|
)
|