Add v2 relational domain foundations with import run/file models
This commit is contained in:
@ -1,117 +1,99 @@
|
||||
from django.contrib import admin
|
||||
from django.contrib import messages
|
||||
from django.db.models import Count
|
||||
|
||||
from apps.providers.registry import get_default_provider_namespace
|
||||
|
||||
from .models import IngestionError, IngestionRun
|
||||
from .tasks import trigger_full_sync, trigger_incremental_sync
|
||||
from .models import ImportFile, ImportRun, IngestionError, IngestionRun
|
||||
|
||||
|
||||
class IngestionErrorInline(admin.TabularInline):
|
||||
model = IngestionError
|
||||
class ImportFileInline(admin.TabularInline):
|
||||
model = ImportFile
|
||||
extra = 0
|
||||
readonly_fields = ("provider_namespace", "entity_type", "external_id", "severity", "message", "occurred_at")
|
||||
|
||||
|
||||
@admin.register(IngestionRun)
|
||||
class IngestionRunAdmin(admin.ModelAdmin):
|
||||
list_display = (
|
||||
"provider_namespace",
|
||||
"job_type",
|
||||
readonly_fields = (
|
||||
"relative_path",
|
||||
"status",
|
||||
"records_processed",
|
||||
"records_created",
|
||||
"records_updated",
|
||||
"records_failed",
|
||||
"error_count",
|
||||
"short_error_summary",
|
||||
"checksum",
|
||||
"file_size_bytes",
|
||||
"rows_total",
|
||||
"rows_upserted",
|
||||
"rows_failed",
|
||||
"error_message",
|
||||
"processed_at",
|
||||
"created_at",
|
||||
)
|
||||
|
||||
|
||||
@admin.register(ImportRun)
|
||||
class ImportRunAdmin(admin.ModelAdmin):
|
||||
list_display = (
|
||||
"id",
|
||||
"source",
|
||||
"status",
|
||||
"files_total",
|
||||
"files_processed",
|
||||
"rows_total",
|
||||
"rows_upserted",
|
||||
"rows_failed",
|
||||
"started_at",
|
||||
"finished_at",
|
||||
"created_at",
|
||||
)
|
||||
list_filter = ("provider_namespace", "job_type", "status")
|
||||
search_fields = ("provider_namespace",)
|
||||
inlines = (IngestionErrorInline,)
|
||||
list_filter = ("source", "status")
|
||||
search_fields = ("source", "error_summary")
|
||||
readonly_fields = (
|
||||
"provider_namespace",
|
||||
"job_type",
|
||||
"source",
|
||||
"status",
|
||||
"triggered_by",
|
||||
"started_at",
|
||||
"finished_at",
|
||||
"records_processed",
|
||||
"records_created",
|
||||
"records_updated",
|
||||
"records_failed",
|
||||
"files_total",
|
||||
"files_processed",
|
||||
"rows_total",
|
||||
"rows_upserted",
|
||||
"rows_failed",
|
||||
"error_summary",
|
||||
"context",
|
||||
"raw_payload",
|
||||
"created_at",
|
||||
)
|
||||
actions = (
|
||||
"enqueue_full_sync_default_provider",
|
||||
"enqueue_incremental_sync_default_provider",
|
||||
"retry_selected_runs",
|
||||
inlines = (ImportFileInline,)
|
||||
|
||||
|
||||
@admin.register(ImportFile)
|
||||
class ImportFileAdmin(admin.ModelAdmin):
|
||||
list_display = (
|
||||
"id",
|
||||
"import_run",
|
||||
"relative_path",
|
||||
"status",
|
||||
"rows_total",
|
||||
"rows_upserted",
|
||||
"rows_failed",
|
||||
"processed_at",
|
||||
)
|
||||
list_filter = ("status",)
|
||||
search_fields = ("relative_path", "checksum", "error_message")
|
||||
readonly_fields = (
|
||||
"import_run",
|
||||
"relative_path",
|
||||
"status",
|
||||
"checksum",
|
||||
"file_size_bytes",
|
||||
"rows_total",
|
||||
"rows_upserted",
|
||||
"rows_failed",
|
||||
"error_message",
|
||||
"payload_preview",
|
||||
"processed_at",
|
||||
"created_at",
|
||||
)
|
||||
|
||||
@admin.action(description="Queue full sync (default provider)")
|
||||
def enqueue_full_sync_default_provider(self, request, queryset):
|
||||
provider_namespace = get_default_provider_namespace()
|
||||
trigger_full_sync.delay(provider_namespace=provider_namespace, triggered_by_id=request.user.id)
|
||||
self.message_user(request, f"Queued full sync task for {provider_namespace}.", level=messages.SUCCESS)
|
||||
|
||||
@admin.action(description="Queue incremental sync (default provider)")
|
||||
def enqueue_incremental_sync_default_provider(self, request, queryset):
|
||||
provider_namespace = get_default_provider_namespace()
|
||||
trigger_incremental_sync.delay(provider_namespace=provider_namespace, triggered_by_id=request.user.id)
|
||||
self.message_user(request, f"Queued incremental sync task for {provider_namespace}.", level=messages.SUCCESS)
|
||||
|
||||
@admin.action(description="Retry selected ingestion runs")
|
||||
def retry_selected_runs(self, request, queryset):
|
||||
count = 0
|
||||
for run in queryset:
|
||||
if run.job_type == IngestionRun.JobType.INCREMENTAL:
|
||||
trigger_incremental_sync.delay(
|
||||
provider_namespace=run.provider_namespace,
|
||||
triggered_by_id=request.user.id,
|
||||
context={"retry_of": run.id},
|
||||
)
|
||||
else:
|
||||
trigger_full_sync.delay(
|
||||
provider_namespace=run.provider_namespace,
|
||||
triggered_by_id=request.user.id,
|
||||
context={"retry_of": run.id},
|
||||
)
|
||||
count += 1
|
||||
self.message_user(request, f"Queued {count} retry task(s).", level=messages.SUCCESS)
|
||||
|
||||
def get_queryset(self, request):
|
||||
queryset = super().get_queryset(request)
|
||||
return queryset.annotate(_error_count=Count("errors"))
|
||||
|
||||
@admin.display(ordering="_error_count", description="Errors")
|
||||
def error_count(self, obj):
|
||||
return getattr(obj, "_error_count", 0)
|
||||
|
||||
@admin.display(description="Error summary")
|
||||
def short_error_summary(self, obj):
|
||||
if not obj.error_summary:
|
||||
return "-"
|
||||
return (obj.error_summary[:90] + "...") if len(obj.error_summary) > 90 else obj.error_summary
|
||||
@admin.register(IngestionRun)
|
||||
class LegacyIngestionRunAdmin(admin.ModelAdmin):
|
||||
list_display = ("provider_namespace", "job_type", "status", "started_at", "finished_at")
|
||||
list_filter = ("provider_namespace", "job_type", "status")
|
||||
search_fields = ("provider_namespace", "error_summary")
|
||||
|
||||
|
||||
@admin.register(IngestionError)
|
||||
class IngestionErrorAdmin(admin.ModelAdmin):
|
||||
class LegacyIngestionErrorAdmin(admin.ModelAdmin):
|
||||
list_display = ("provider_namespace", "entity_type", "external_id", "severity", "occurred_at")
|
||||
list_filter = ("severity", "provider_namespace")
|
||||
search_fields = ("entity_type", "external_id", "message")
|
||||
readonly_fields = (
|
||||
"ingestion_run",
|
||||
"provider_namespace",
|
||||
"entity_type",
|
||||
"external_id",
|
||||
"severity",
|
||||
"message",
|
||||
"raw_payload",
|
||||
"occurred_at",
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user