phase3: add normalized domain schema, admin, services, and multistage docker build
This commit is contained in:
32
apps/ingestion/admin.py
Normal file
32
apps/ingestion/admin.py
Normal file
@ -0,0 +1,32 @@
|
||||
from django.contrib import admin
|
||||
|
||||
from .models import IngestionError, IngestionRun
|
||||
|
||||
|
||||
class IngestionErrorInline(admin.TabularInline):
|
||||
model = IngestionError
|
||||
extra = 0
|
||||
readonly_fields = ("occurred_at",)
|
||||
|
||||
|
||||
@admin.register(IngestionRun)
|
||||
class IngestionRunAdmin(admin.ModelAdmin):
|
||||
list_display = (
|
||||
"provider_namespace",
|
||||
"job_type",
|
||||
"status",
|
||||
"records_processed",
|
||||
"records_failed",
|
||||
"started_at",
|
||||
"finished_at",
|
||||
)
|
||||
list_filter = ("provider_namespace", "job_type", "status")
|
||||
search_fields = ("provider_namespace",)
|
||||
inlines = (IngestionErrorInline,)
|
||||
|
||||
|
||||
@admin.register(IngestionError)
|
||||
class IngestionErrorAdmin(admin.ModelAdmin):
|
||||
list_display = ("provider_namespace", "entity_type", "external_id", "severity", "occurred_at")
|
||||
list_filter = ("severity", "provider_namespace")
|
||||
search_fields = ("entity_type", "external_id", "message")
|
||||
84
apps/ingestion/migrations/0001_initial.py
Normal file
84
apps/ingestion/migrations/0001_initial.py
Normal file
@ -0,0 +1,84 @@
|
||||
# Generated by Django 5.2.12 on 2026-03-10 09:33
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='IngestionRun',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('provider_namespace', models.CharField(max_length=80)),
|
||||
('job_type', models.CharField(choices=[('full_sync', 'Full Sync'), ('incremental', 'Incremental'), ('manual', 'Manual')], max_length=32)),
|
||||
('status', models.CharField(choices=[('pending', 'Pending'), ('running', 'Running'), ('success', 'Success'), ('failed', 'Failed'), ('canceled', 'Canceled')], default='pending', max_length=24)),
|
||||
('started_at', models.DateTimeField(blank=True, null=True)),
|
||||
('finished_at', models.DateTimeField(blank=True, null=True)),
|
||||
('records_processed', models.PositiveIntegerField(default=0)),
|
||||
('records_created', models.PositiveIntegerField(default=0)),
|
||||
('records_updated', models.PositiveIntegerField(default=0)),
|
||||
('records_failed', models.PositiveIntegerField(default=0)),
|
||||
('context', models.JSONField(blank=True, default=dict)),
|
||||
('raw_payload', models.JSONField(blank=True, default=dict)),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('triggered_by', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='ingestion_runs', to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
options={
|
||||
'ordering': ['-created_at'],
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='IngestionError',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('provider_namespace', models.CharField(max_length=80)),
|
||||
('entity_type', models.CharField(blank=True, max_length=80)),
|
||||
('external_id', models.CharField(blank=True, max_length=160)),
|
||||
('severity', models.CharField(choices=[('warning', 'Warning'), ('error', 'Error'), ('critical', 'Critical')], default='error', max_length=16)),
|
||||
('message', models.TextField()),
|
||||
('raw_payload', models.JSONField(blank=True, default=dict)),
|
||||
('occurred_at', models.DateTimeField(auto_now_add=True)),
|
||||
('ingestion_run', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='errors', to='ingestion.ingestionrun')),
|
||||
],
|
||||
options={
|
||||
'ordering': ['-occurred_at'],
|
||||
},
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='ingestionrun',
|
||||
index=models.Index(fields=['provider_namespace', 'status'], name='ingestion_i_provide_1145f9_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='ingestionrun',
|
||||
index=models.Index(fields=['provider_namespace', 'job_type'], name='ingestion_i_provide_2f19cd_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='ingestionrun',
|
||||
index=models.Index(fields=['started_at'], name='ingestion_i_started_525875_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='ingestionrun',
|
||||
index=models.Index(fields=['finished_at'], name='ingestion_i_finishe_128c60_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='ingestionerror',
|
||||
index=models.Index(fields=['ingestion_run', 'occurred_at'], name='ingestion_i_ingesti_c957d2_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='ingestionerror',
|
||||
index=models.Index(fields=['provider_namespace', 'entity_type', 'external_id'], name='ingestion_i_provide_d3f744_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='ingestionerror',
|
||||
index=models.Index(fields=['severity'], name='ingestion_i_severit_9e9331_idx'),
|
||||
),
|
||||
]
|
||||
79
apps/ingestion/models.py
Normal file
79
apps/ingestion/models.py
Normal file
@ -0,0 +1,79 @@
|
||||
from django.conf import settings
|
||||
from django.db import models
|
||||
|
||||
|
||||
class IngestionRun(models.Model):
|
||||
class RunStatus(models.TextChoices):
|
||||
PENDING = "pending", "Pending"
|
||||
RUNNING = "running", "Running"
|
||||
SUCCESS = "success", "Success"
|
||||
FAILED = "failed", "Failed"
|
||||
CANCELED = "canceled", "Canceled"
|
||||
|
||||
class JobType(models.TextChoices):
|
||||
FULL_SYNC = "full_sync", "Full Sync"
|
||||
INCREMENTAL = "incremental", "Incremental"
|
||||
MANUAL = "manual", "Manual"
|
||||
|
||||
provider_namespace = models.CharField(max_length=80)
|
||||
job_type = models.CharField(max_length=32, choices=JobType.choices)
|
||||
status = models.CharField(max_length=24, choices=RunStatus.choices, default=RunStatus.PENDING)
|
||||
triggered_by = models.ForeignKey(
|
||||
settings.AUTH_USER_MODEL,
|
||||
on_delete=models.SET_NULL,
|
||||
blank=True,
|
||||
null=True,
|
||||
related_name="ingestion_runs",
|
||||
)
|
||||
started_at = models.DateTimeField(blank=True, null=True)
|
||||
finished_at = models.DateTimeField(blank=True, null=True)
|
||||
records_processed = models.PositiveIntegerField(default=0)
|
||||
records_created = models.PositiveIntegerField(default=0)
|
||||
records_updated = models.PositiveIntegerField(default=0)
|
||||
records_failed = models.PositiveIntegerField(default=0)
|
||||
context = models.JSONField(default=dict, blank=True)
|
||||
raw_payload = models.JSONField(default=dict, blank=True)
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
class Meta:
|
||||
ordering = ["-created_at"]
|
||||
indexes = [
|
||||
models.Index(fields=["provider_namespace", "status"]),
|
||||
models.Index(fields=["provider_namespace", "job_type"]),
|
||||
models.Index(fields=["started_at"]),
|
||||
models.Index(fields=["finished_at"]),
|
||||
]
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.provider_namespace} | {self.job_type} | {self.status}"
|
||||
|
||||
|
||||
class IngestionError(models.Model):
|
||||
class Severity(models.TextChoices):
|
||||
WARNING = "warning", "Warning"
|
||||
ERROR = "error", "Error"
|
||||
CRITICAL = "critical", "Critical"
|
||||
|
||||
ingestion_run = models.ForeignKey(
|
||||
"ingestion.IngestionRun",
|
||||
on_delete=models.CASCADE,
|
||||
related_name="errors",
|
||||
)
|
||||
provider_namespace = models.CharField(max_length=80)
|
||||
entity_type = models.CharField(max_length=80, blank=True)
|
||||
external_id = models.CharField(max_length=160, blank=True)
|
||||
severity = models.CharField(max_length=16, choices=Severity.choices, default=Severity.ERROR)
|
||||
message = models.TextField()
|
||||
raw_payload = models.JSONField(default=dict, blank=True)
|
||||
occurred_at = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
class Meta:
|
||||
ordering = ["-occurred_at"]
|
||||
indexes = [
|
||||
models.Index(fields=["ingestion_run", "occurred_at"]),
|
||||
models.Index(fields=["provider_namespace", "entity_type", "external_id"]),
|
||||
models.Index(fields=["severity"]),
|
||||
]
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.provider_namespace}:{self.entity_type} [{self.severity}]"
|
||||
0
apps/ingestion/services/__init__.py
Normal file
0
apps/ingestion/services/__init__.py
Normal file
46
apps/ingestion/services/runs.py
Normal file
46
apps/ingestion/services/runs.py
Normal file
@ -0,0 +1,46 @@
|
||||
from django.utils import timezone
|
||||
|
||||
from apps.ingestion.models import IngestionError, IngestionRun
|
||||
|
||||
|
||||
def start_ingestion_run(*, provider_namespace: str, job_type: str, triggered_by=None, context: dict | None = None) -> IngestionRun:
|
||||
return IngestionRun.objects.create(
|
||||
provider_namespace=provider_namespace,
|
||||
job_type=job_type,
|
||||
status=IngestionRun.RunStatus.RUNNING,
|
||||
triggered_by=triggered_by,
|
||||
started_at=timezone.now(),
|
||||
context=context or {},
|
||||
)
|
||||
|
||||
|
||||
def finish_ingestion_run(*, run: IngestionRun, status: str, processed: int = 0, created: int = 0, updated: int = 0, failed: int = 0) -> IngestionRun:
|
||||
run.status = status
|
||||
run.records_processed = processed
|
||||
run.records_created = created
|
||||
run.records_updated = updated
|
||||
run.records_failed = failed
|
||||
run.finished_at = timezone.now()
|
||||
run.save(
|
||||
update_fields=[
|
||||
"status",
|
||||
"records_processed",
|
||||
"records_created",
|
||||
"records_updated",
|
||||
"records_failed",
|
||||
"finished_at",
|
||||
]
|
||||
)
|
||||
return run
|
||||
|
||||
|
||||
def log_ingestion_error(*, run: IngestionRun, message: str, provider_namespace: str, severity: str = IngestionError.Severity.ERROR, entity_type: str = "", external_id: str = "", raw_payload: dict | None = None) -> IngestionError:
|
||||
return IngestionError.objects.create(
|
||||
ingestion_run=run,
|
||||
provider_namespace=provider_namespace,
|
||||
message=message,
|
||||
severity=severity,
|
||||
entity_type=entity_type,
|
||||
external_id=external_id,
|
||||
raw_payload=raw_payload or {},
|
||||
)
|
||||
Reference in New Issue
Block a user