14 KiB
ТЗ: dataloader (один пакет, async, PG-очередь, LISTEN/NOTIFY)
1) Назначение и рамки
dataloader - сервис постановки и исполнения долгих ETL-задач через одну общую очередь в Postgres. Сервис предоставляет HTTP-ручки для триггера задач, мониторинга статуса и отмены; внутри процесса запускает N асинхронных воркеров, которые конкурируют за задачи через SELECT … FOR UPDATE SKIP LOCKED, держат lease/heartbeat, делают идемпотентные записи в целевые БД и корректно обрабатывают повторы.
Архитектura и инфраструктурные части соответствуют шаблону rest_template.md: единый пакет, os_router.py с /health и /status, middleware логирования, структура каталогов и конфиг-классы - как в шаблоне.
2) Архитектура (одно приложение, async)
- FastAPI-приложение: HTTP API v1, инфраструктурные роуты (
/health,/status) из шаблона, middleware и логирование из шаблона. - WorkerManager: на
startupчитает конфиг (WORKERS_JSON) и поднимает M асинхронных воркер-циклов (по очередям и уровням параллелизма). Наshutdown- мягкая остановка. - PG Queue: одна таблица
dl_jobsна все очереди и сервисы; журналdl_job_events; триггеры LISTEN/NOTIFY для пробуждения воркеров без активного поллинга.
3) Структура репозитория (один пакет, как в шаблоне)
dataloader/
├── src/
│ └── dataloader/
│ ├── __main__.py # точка входа FastAPI + запуск WorkerManager (по шаблону)
│ ├── config.py # Pydantic Settings: DSN, тайминги, WORKERS_JSON
│ ├── base.py
│ ├── context.py # AppContext: engine/sessionmaker, DI
│ ├── exceptions.py
│ ├── logger/ # не менять тип и контракты
│ │ ├── __init__.py
│ │ ├── context_vars.py
│ │ ├── logger.py
│ │ ├── models.py
│ │ ├── utils.py
│ │ └── uvicorn_logging_config.py
│ ├── api/
│ │ ├── __init__.py # регистрация роутов (v1, os_router, metric_router)
│ │ ├── middleware.py
│ │ ├── os_router.py # /health, /status
│ │ ├── metric_router.py
│ │ └── v1/
│ │ ├── router.py # POST /jobs/trigger, GET /jobs/{id}/status, POST /jobs/{id}/cancel
│ │ ├── schemas.py # pydantic запросы/ответы
│ │ ├── service.py # бизнес-логика
│ │ ├── models.py
│ │ ├── exceptions.py
│ │ └── utils.py
│ ├── storage/
│ │ ├── db.py # async engine + sessionmaker
│ │ └── repositories.py # SQL-операции по очереди и событиям
│ └── workers/
│ ├── manager.py # создание asyncio Tasks воркеров по конфигу
│ ├── base.py # общий PG-воркер: claim/lease/heartbeat/retry
│ └── pipelines/
│ ├── __init__.py
│ └── registry.py # реестр обработчиков по task
├── tests/
│ └── integration_tests/
│ ├── conftest.py
│ └── v1_api/
│ ├── constants.py
│ └── test_service.py
├── pyproject.toml
├── Dockerfile
├── .env
└── .gitignore
Структура, ролевые файлы и подход соответствуют rest_template.md.
4) DDL очереди (общая для всех сервисов)
Таблицы уже созданы и доступны приложению.
CREATE TYPE dl_status AS ENUM ('queued','running','succeeded','failed','canceled','lost');
CREATE TABLE dl_jobs (
job_id uuid PRIMARY KEY,
queue text NOT NULL,
task text NOT NULL,
args jsonb NOT NULL DEFAULT '{}'::jsonb,
idempotency_key text UNIQUE,
lock_key text NOT NULL,
partition_key text NOT NULL DEFAULT '',
priority int NOT NULL DEFAULT 100,
available_at timestamptz NOT NULL DEFAULT now(),
status dl_status NOT NULL DEFAULT 'queued',
attempt int NOT NULL DEFAULT 0,
max_attempts int NOT NULL DEFAULT 5,
lease_ttl_sec int NOT NULL DEFAULT 60,
lease_expires_at timestamptz,
heartbeat_at timestamptz,
cancel_requested boolean NOT NULL DEFAULT false,
progress jsonb NOT NULL DEFAULT '{}'::jsonb,
error text,
producer text,
consumer_group text,
created_at timestamptz NOT NULL DEFAULT now(),
started_at timestamptz,
finished_at timestamptz,
CONSTRAINT dl_jobs_chk_positive CHECK (priority >= 0 AND attempt >= 0 AND max_attempts >= 0 AND lease_ttl_sec > 0)
);
CREATE INDEX ix_dl_jobs_claim ON dl_jobs(queue, available_at, priority, created_at)
WHERE status = 'queued';
CREATE INDEX ix_dl_jobs_running_lease ON dl_jobs(lease_expires_at)
WHERE status = 'running';
CREATE INDEX ix_dl_jobs_status_queue ON dl_jobs(status, queue);
CREATE TABLE dl_job_events (
event_id bigserial PRIMARY KEY,
job_id uuid NOT NULL REFERENCES dl_jobs(job_id) ON DELETE CASCADE,
queue text NOT NULL,
ts timestamptz NOT NULL DEFAULT now(),
kind text NOT NULL,
payload jsonb
);
CREATE OR REPLACE FUNCTION notify_job_ready() RETURNS trigger AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
PERFORM pg_notify('dl_jobs', NEW.queue);
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
IF NEW.status = 'queued' AND NEW.available_at <= now()
AND (OLD.status IS DISTINCT FROM NEW.status OR OLD.available_at IS DISTINCT FROM NEW.available_at) THEN
PERFORM pg_notify('dl_jobs', NEW.queue);
END IF;
RETURN NEW;
END IF;
RETURN NEW;
END $$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS dl_jobs_notify_ins ON dl_jobs;
CREATE TRIGGER dl_jobs_notify_ins
AFTER INSERT ON dl_jobs
FOR EACH ROW EXECUTE FUNCTION notify_job_ready();
DROP TRIGGER IF EXISTS dl_jobs_notify_upd ON dl_jobs;
CREATE TRIGGER dl_jobs_notify_upd
AFTER UPDATE OF status, available_at ON dl_jobs
FOR EACH ROW EXECUTE FUNCTION notify_job_ready();
5) Контракты API (v1)
-
POST /api/v1/jobs/triggerВход:{queue: str, task: str, args?: dict, idempotency_key?: str, lock_key: str, partition_key?: str, priority?: int, available_at?: RFC3339}Выход:{job_id: UUID, status: str}Поведение: идемпотентная постановка; триггер LISTEN/NOTIFY срабатывает за счёт триггера в БД. -
GET /api/v1/jobs/{job_id}/statusВыход:{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress: {}} -
POST /api/v1/jobs/{job_id}/cancelВыход:{… как status …}Поведение: устанавливаетcancel_requested = true. Воркер кооперативно завершает задачу между чанками.
Инфраструктурные эндпоинты /health, /status, мидлвар и регистрация роутов - как в шаблоне.
6) Протокол выполнения (воркер)
- Claim одной задачи:
WITH cte AS (
SELECT job_id
FROM dl_jobs
WHERE status='queued' AND queue=:queue AND available_at <= now()
ORDER BY priority ASC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE dl_jobs j
SET status='running',
started_at = COALESCE(started_at, now()),
attempt = attempt + 1,
lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec),
heartbeat_at = now()
FROM cte
WHERE j.job_id = cte.job_id
RETURNING j.job_id, j.task, j.args, j.lock_key, j.partition_key, j.lease_ttl_sec;
Затем SELECT pg_try_advisory_lock(hashtext(:lock_key)). Если false - backoff:
UPDATE dl_jobs
SET status='queued', available_at = now() + make_interval(secs => :sec)
WHERE job_id=:jid;
- Heartbeat раз в
DL_HEARTBEAT_SEC:
UPDATE dl_jobs
SET heartbeat_at = now(),
lease_expires_at = now() + make_interval(secs => :ttl)
WHERE job_id = :jid AND status='running';
- Завершение:
- Успех:
UPDATE dl_jobs
SET status='succeeded', finished_at=now(), lease_expires_at=NULL
WHERE job_id=:jid;
- Ошибка/ретрай:
UPDATE dl_jobs
SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END,
available_at = CASE WHEN attempt < max_attempts THEN now() + make_interval(secs => 30 * attempt) ELSE now() END,
error = :err,
lease_expires_at = NULL,
finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END
WHERE job_id=:jid;
Всегда выставлять/снимать advisory-lock на lock_key.
-
Отмена: воркер проверяет
cancel_requestedмежду чанками; приtrueзавершает пайплайн (обычно какcanceledлибо какfailedбез ретраев - политика проекта). -
Reaper (фон у приложения): раз в
DL_REAPER_PERIOD_SECвозвращает «потерянные» задачи в очередь.
UPDATE dl_jobs
SET status='queued', available_at=now(), lease_expires_at=NULL
WHERE status='running'
AND lease_expires_at IS NOT NULL
AND lease_expires_at < now()
RETURNING job_id;
7) Оптимизация и SLA
- Claim - O(log N) благодаря частичному индексу
ix_dl_jobs_claim. - Reaper - O(log N) по индексу
ix_dl_jobs_running_lease. /health- без БД; время ответа ≤ 20 мс./jobs/*- не держат долгих транзакций.- Гарантия доставки: at-least-once; операции записи в целевые таблицы - идемпотентны (реализуется в конкретных пайплайнах).
- Конкуренция: один
lock_keyодновременно исполняется одним воркером; параллелизм достигается независимымиpartition_key.
8) Конфигурация (ENV)
DL_DB_DSN- DSN Postgres (async).WORKERS_JSON- JSON-список конфигураций воркеров, напр.:[{"queue":"load.cbr","concurrency":2},{"queue":"load.sgx","concurrency":1}].DL_HEARTBEAT_SEC(деф. 10),DL_DEFAULT_LEASE_TTL_SEC(деф. 60),DL_REAPER_PERIOD_SEC(деф. 10),DL_CLAIM_BACKOFF_SEC(деф. 15).- Логирование, middleware,
uvicorn_logging_config- из шаблона без изменения контрактов.
9) Эксплуатация и деплой
- Один контейнер, один Pod, несколько async-воркеров внутри процесса (через
WorkerManager). - Масштабирование - количеством реплик Deployment: очередь в БД,
FOR UPDATE SKIP LOCKEDи advisory-lock обеспечат корректность в гонке. - Пробы:
readiness/livenessна/healthизos_router.py. - Завершение: на SIGTERM - остановить reaper, подать сигнал воркерам для мягкой остановки, дождаться тасков с таймаутом.
10) Безопасность, аудит, наблюдаемость
- Структурные логи через
logger/*шаблона; маскирование чувствительных полей - как вlogger/utils.py. - Журнал жизненного цикла в
dl_job_events(queued/picked/heartbeat/requeue/done/failed/canceled). - Метрики (BETA) - через
metric_router.pyиз шаблона при необходимости.
11) Тест-план
- Интеграционные тесты
v1: постановка → статус → отмена. - E2E: постановка → claim мок-воркером → heartbeat → done → статус
succeeded. - Конкуренция: два воркера на один
lock_key→ один backoff, один исполняет. - Reaper: просроченный lease → возврат в
queued.
12) TODO
context.py: инициализация engine/sessionmaker, AppContext (как в шаблоне).api/v1/router.py:trigger,status,cancel.api/v1/service.py: бизнес-логика поверх репозитория.storage/repositories.py: SQL для create_or_get, get_status, cancel, requeue_lost.workers/base.py: claim/heartbeat/finish/retry/cancel/advisory-lock.workers/manager.py: парсингWORKERS_JSON, создание тасков, graceful shutdown.- Тесты
tests/integration_tests/v1_api/test_service.pyпо стилю шаблона. - Документация
.envи примерыWORKERS_JSON. - БД-таблицы уже созданы (DDL применён).