# ТЗ: `dataloader` (один пакет, async, PG-очередь, LISTEN/NOTIFY) ## 1) Назначение и рамки `dataloader` — сервис постановки и исполнения долгих ETL-задач через одну общую очередь в Postgres. Сервис предоставляет HTTP-ручки для триггера задач, мониторинга статуса и отмены; внутри процесса запускает N асинхронных воркеров, которые конкурируют за задачи через `SELECT … FOR UPDATE SKIP LOCKED`, держат lease/heartbeat, делают идемпотентные записи в целевые БД и корректно обрабатывают повторы. Архитектura и инфраструктурные части соответствуют шаблону `rest_template.md`: единый пакет, `os_router.py` с `/health` и `/status`, middleware логирования, структура каталогов и конфиг-классы — **как в шаблоне**. --- ## 2) Архитектура (одно приложение, async) * **FastAPI-приложение**: HTTP API v1, инфраструктурные роуты (`/health`, `/status`) из шаблона, middleware и логирование из шаблона. * **WorkerManager**: на `startup` читает конфиг (`WORKERS_JSON`) и поднимает M асинхронных воркер-циклов (по очередям и уровням параллелизма). На `shutdown` — мягкая остановка. * **PG Queue**: одна таблица `dl_jobs` на все очереди и сервисы; журнал `dl_job_events`; триггеры LISTEN/NOTIFY для пробуждения воркеров без активного поллинга. --- ## 3) Структура репозитория (один пакет, как в шаблоне) ``` dataloader/ ├── src/ │ └── dataloader/ │ ├── __main__.py # точка входа FastAPI + запуск WorkerManager (по шаблону) │ ├── config.py # Pydantic Settings: DSN, тайминги, WORKERS_JSON │ ├── base.py │ ├── context.py # AppContext: engine/sessionmaker, DI │ ├── exceptions.py │ ├── logger/ # не менять тип и контракты │ │ ├── __init__.py │ │ ├── context_vars.py │ │ ├── logger.py │ │ ├── models.py │ │ ├── utils.py │ │ └── uvicorn_logging_config.py │ ├── api/ │ │ ├── __init__.py # регистрация роутов (v1, os_router, metric_router) │ │ ├── middleware.py │ │ ├── os_router.py # /health, /status │ │ ├── metric_router.py │ │ └── v1/ │ │ ├── router.py # POST /jobs/trigger, GET /jobs/{id}/status, POST /jobs/{id}/cancel │ │ ├── schemas.py # pydantic запросы/ответы │ │ ├── service.py # бизнес-логика │ │ ├── models.py │ │ ├── exceptions.py │ │ └── utils.py │ ├── storage/ │ │ ├── db.py # async engine + sessionmaker │ │ └── repositories.py # SQL-операции по очереди и событиям │ └── workers/ │ ├── manager.py # создание asyncio Tasks воркеров по конфигу │ ├── base.py # общий PG-воркер: claim/lease/heartbeat/retry │ └── pipelines/ │ ├── __init__.py │ └── registry.py # реестр обработчиков по task ├── tests/ │ └── integration_tests/ │ ├── conftest.py │ └── v1_api/ │ ├── constants.py │ └── test_service.py ├── pyproject.toml ├── Dockerfile ├── .env └── .gitignore ``` Структура, ролевые файлы и подход соответствуют `rest_template.md`. --- ## 4) DDL очереди (общая для всех сервисов) > Таблицы уже созданы и доступны приложению. ```sql CREATE TYPE dl_status AS ENUM ('queued','running','succeeded','failed','canceled','lost'); CREATE TABLE dl_jobs ( job_id uuid PRIMARY KEY, queue text NOT NULL, task text NOT NULL, args jsonb NOT NULL DEFAULT '{}'::jsonb, idempotency_key text UNIQUE, lock_key text NOT NULL, partition_key text NOT NULL DEFAULT '', priority int NOT NULL DEFAULT 100, available_at timestamptz NOT NULL DEFAULT now(), status dl_status NOT NULL DEFAULT 'queued', attempt int NOT NULL DEFAULT 0, max_attempts int NOT NULL DEFAULT 5, lease_ttl_sec int NOT NULL DEFAULT 60, lease_expires_at timestamptz, heartbeat_at timestamptz, cancel_requested boolean NOT NULL DEFAULT false, progress jsonb NOT NULL DEFAULT '{}'::jsonb, error text, producer text, consumer_group text, created_at timestamptz NOT NULL DEFAULT now(), started_at timestamptz, finished_at timestamptz, CONSTRAINT dl_jobs_chk_positive CHECK (priority >= 0 AND attempt >= 0 AND max_attempts >= 0 AND lease_ttl_sec > 0) ); CREATE INDEX ix_dl_jobs_claim ON dl_jobs(queue, available_at, priority, created_at) WHERE status = 'queued'; CREATE INDEX ix_dl_jobs_running_lease ON dl_jobs(lease_expires_at) WHERE status = 'running'; CREATE INDEX ix_dl_jobs_status_queue ON dl_jobs(status, queue); CREATE TABLE dl_job_events ( event_id bigserial PRIMARY KEY, job_id uuid NOT NULL REFERENCES dl_jobs(job_id) ON DELETE CASCADE, queue text NOT NULL, ts timestamptz NOT NULL DEFAULT now(), kind text NOT NULL, payload jsonb ); CREATE OR REPLACE FUNCTION notify_job_ready() RETURNS trigger AS $$ BEGIN IF (TG_OP = 'INSERT') THEN PERFORM pg_notify('dl_jobs', NEW.queue); RETURN NEW; ELSIF (TG_OP = 'UPDATE') THEN IF NEW.status = 'queued' AND NEW.available_at <= now() AND (OLD.status IS DISTINCT FROM NEW.status OR OLD.available_at IS DISTINCT FROM NEW.available_at) THEN PERFORM pg_notify('dl_jobs', NEW.queue); END IF; RETURN NEW; END IF; RETURN NEW; END $$ LANGUAGE plpgsql; DROP TRIGGER IF EXISTS dl_jobs_notify_ins ON dl_jobs; CREATE TRIGGER dl_jobs_notify_ins AFTER INSERT ON dl_jobs FOR EACH ROW EXECUTE FUNCTION notify_job_ready(); DROP TRIGGER IF EXISTS dl_jobs_notify_upd ON dl_jobs; CREATE TRIGGER dl_jobs_notify_upd AFTER UPDATE OF status, available_at ON dl_jobs FOR EACH ROW EXECUTE FUNCTION notify_job_ready(); ``` --- ## 5) Контракты API (v1) * `POST /api/v1/jobs/trigger` Вход: `{queue: str, task: str, args?: dict, idempotency_key?: str, lock_key: str, partition_key?: str, priority?: int, available_at?: RFC3339}` Выход: `{job_id: UUID, status: str}` Поведение: идемпотентная постановка; триггер LISTEN/NOTIFY срабатывает за счёт триггера в БД. * `GET /api/v1/jobs/{job_id}/status` Выход: `{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress: {}}` * `POST /api/v1/jobs/{job_id}/cancel` Выход: `{… как status …}` Поведение: устанавливает `cancel_requested = true`. Воркер кооперативно завершает задачу между чанками. Инфраструктурные эндпоинты `/health`, `/status`, мидлвар и регистрация роутов — **как в шаблоне**. --- ## 6) Протокол выполнения (воркер) 1. **Claim** одной задачи: ```sql WITH cte AS ( SELECT job_id FROM dl_jobs WHERE status='queued' AND queue=:queue AND available_at <= now() ORDER BY priority ASC, created_at ASC FOR UPDATE SKIP LOCKED LIMIT 1 ) UPDATE dl_jobs j SET status='running', started_at = COALESCE(started_at, now()), attempt = attempt + 1, lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec), heartbeat_at = now() FROM cte WHERE j.job_id = cte.job_id RETURNING j.job_id, j.task, j.args, j.lock_key, j.partition_key, j.lease_ttl_sec; ``` Затем `SELECT pg_try_advisory_lock(hashtext(:lock_key))`. Если `false` — `backoff`: ```sql UPDATE dl_jobs SET status='queued', available_at = now() + make_interval(secs => :sec) WHERE job_id=:jid; ``` 2. **Heartbeat** раз в `DL_HEARTBEAT_SEC`: ```sql UPDATE dl_jobs SET heartbeat_at = now(), lease_expires_at = now() + make_interval(secs => :ttl) WHERE job_id = :jid AND status='running'; ``` 3. **Завершение**: * Успех: ```sql UPDATE dl_jobs SET status='succeeded', finished_at=now(), lease_expires_at=NULL WHERE job_id=:jid; ``` * Ошибка/ретрай: ```sql UPDATE dl_jobs SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END, available_at = CASE WHEN attempt < max_attempts THEN now() + make_interval(secs => 30 * attempt) ELSE now() END, error = :err, lease_expires_at = NULL, finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END WHERE job_id=:jid; ``` Всегда выставлять/снимать advisory-lock на `lock_key`. 4. **Отмена**: воркер проверяет `cancel_requested` между чанками; при `true` завершает пайплайн (обычно как `canceled` либо как `failed` без ретраев — политика проекта). 5. **Reaper** (фон у приложения): раз в `DL_REAPER_PERIOD_SEC` возвращает «потерянные» задачи в очередь. ```sql UPDATE dl_jobs SET status='queued', available_at=now(), lease_expires_at=NULL WHERE status='running' AND lease_expires_at IS NOT NULL AND lease_expires_at < now() RETURNING job_id; ``` --- ## 7) Оптимизация и SLA * Claim — O(log N) благодаря частичному индексу `ix_dl_jobs_claim`. * Reaper — O(log N) по индексу `ix_dl_jobs_running_lease`. * `/health` — без БД; время ответа ≤ 20 мс. `/jobs/*` — не держат долгих транзакций. * Гарантия доставки: **at-least-once**; операции записи в целевые таблицы — идемпотентны (реализуется в конкретных пайплайнах). * Конкуренция: один `lock_key` одновременно исполняется одним воркером; параллелизм достигается независимыми `partition_key`. --- ## 8) Конфигурация (ENV) * `DL_DB_DSN` — DSN Postgres (async). * `WORKERS_JSON` — JSON-список конфигураций воркеров, напр.: `[{"queue":"load.cbr","concurrency":2},{"queue":"load.sgx","concurrency":1}]`. * `DL_HEARTBEAT_SEC` (деф. 10), `DL_DEFAULT_LEASE_TTL_SEC` (деф. 60), `DL_REAPER_PERIOD_SEC` (деф. 10), `DL_CLAIM_BACKOFF_SEC` (деф. 15). * Логирование, middleware, `uvicorn_logging_config` — **из шаблона без изменения контрактов**. --- ## 9) Эксплуатация и деплой * Один контейнер, один Pod, **несколько async-воркеров** внутри процесса (через `WorkerManager`). * Масштабирование — количеством реплик Deployment: очередь в БД, `FOR UPDATE SKIP LOCKED` и advisory-lock обеспечат корректность в гонке. * Пробы: `readiness/liveness` на `/health` из `os_router.py`. * Завершение: на SIGTERM — остановить reaper, подать сигнал воркерам для мягкой остановки, дождаться тасков с таймаутом. --- ## 10) Безопасность, аудит, наблюдаемость * Структурные логи через `logger/*` шаблона; маскирование чувствительных полей — как в `logger/utils.py`. * Журнал жизненного цикла в `dl_job_events` (queued/picked/heartbeat/requeue/done/failed/canceled). * Метрики (BETA) — через `metric_router.py` из шаблона при необходимости. --- ## 11) Тест-план * Интеграционные тесты `v1`: постановка → статус → отмена. * E2E: постановка → claim мок-воркером → heartbeat → done → статус `succeeded`. * Конкуренция: два воркера на один `lock_key` → один backoff, один исполняет. * Reaper: просроченный lease → возврат в `queued`. --- ## 12) TODO * [ ] `context.py`: инициализация engine/sessionmaker, AppContext (как в шаблоне). * [ ] `api/v1/router.py`: `trigger`, `status`, `cancel`. * [ ] `api/v1/service.py`: бизнес-логика поверх репозитория. * [ ] `storage/repositories.py`: SQL для create_or_get, get_status, cancel, requeue_lost. * [ ] `workers/base.py`: claim/heartbeat/finish/retry/cancel/advisory-lock. * [ ] `workers/manager.py`: парсинг `WORKERS_JSON`, создание тасков, graceful shutdown. * [ ] Тесты `tests/integration_tests/v1_api/test_service.py` по стилю шаблона. * [ ] Документация `.env` и примеры `WORKERS_JSON`. * [x] **БД-таблицы уже созданы** (DDL применён).