dataloader/TZ.md

14 KiB
Raw Blame History

ТЗ: dataloader (один пакет, async, PG-очередь, LISTEN/NOTIFY)

1) Назначение и рамки

dataloader - сервис постановки и исполнения долгих ETL-задач через одну общую очередь в Postgres. Сервис предоставляет HTTP-ручки для триггера задач, мониторинга статуса и отмены; внутри процесса запускает N асинхронных воркеров, которые конкурируют за задачи через SELECT … FOR UPDATE SKIP LOCKED, держат lease/heartbeat, делают идемпотентные записи в целевые БД и корректно обрабатывают повторы.

Архитектura и инфраструктурные части соответствуют шаблону rest_template.md: единый пакет, os_router.py с /health и /status, middleware логирования, структура каталогов и конфиг-классы - как в шаблоне.


2) Архитектура (одно приложение, async)

  • FastAPI-приложение: HTTP API v1, инфраструктурные роуты (/health, /status) из шаблона, middleware и логирование из шаблона.
  • WorkerManager: на startup читает конфиг (WORKERS_JSON) и поднимает M асинхронных воркер-циклов (по очередям и уровням параллелизма). На shutdown - мягкая остановка.
  • PG Queue: одна таблица dl_jobs на все очереди и сервисы; журнал dl_job_events; триггеры LISTEN/NOTIFY для пробуждения воркеров без активного поллинга.

3) Структура репозитория (один пакет, как в шаблоне)

dataloader/
├── src/
│   └── dataloader/
│       ├── __main__.py            # точка входа FastAPI + запуск WorkerManager (по шаблону)
│       ├── config.py              # Pydantic Settings: DSN, тайминги, WORKERS_JSON
│       ├── base.py
│       ├── context.py             # AppContext: engine/sessionmaker, DI
│       ├── exceptions.py
│       ├── logger/                # не менять тип и контракты
│       │   ├── __init__.py
│       │   ├── context_vars.py
│       │   ├── logger.py
│       │   ├── models.py
│       │   ├── utils.py
│       │   └── uvicorn_logging_config.py
│       ├── api/
│       │   ├── __init__.py        # регистрация роутов (v1, os_router, metric_router)
│       │   ├── middleware.py
│       │   ├── os_router.py       # /health, /status
│       │   ├── metric_router.py
│       │   └── v1/
│       │       ├── router.py      # POST /jobs/trigger, GET /jobs/{id}/status, POST /jobs/{id}/cancel
│       │       ├── schemas.py     # pydantic запросы/ответы
│       │       ├── service.py     # бизнес-логика
│       │       ├── models.py
│       │       ├── exceptions.py
│       │       └── utils.py
│       ├── storage/
│       │   ├── db.py              # async engine + sessionmaker
│       │   └── repositories.py    # SQL-операции по очереди и событиям
│       └── workers/
│           ├── manager.py         # создание asyncio Tasks воркеров по конфигу
│           ├── base.py            # общий PG-воркер: claim/lease/heartbeat/retry
│           └── pipelines/
│               ├── __init__.py
│               └── registry.py    # реестр обработчиков по task
├── tests/
│   └── integration_tests/
│       ├── conftest.py
│       └── v1_api/
│           ├── constants.py
│           └── test_service.py
├── pyproject.toml
├── Dockerfile
├── .env
└── .gitignore

Структура, ролевые файлы и подход соответствуют rest_template.md.


4) DDL очереди (общая для всех сервисов)

Таблицы уже созданы и доступны приложению.

CREATE TYPE dl_status AS ENUM ('queued','running','succeeded','failed','canceled','lost');

CREATE TABLE dl_jobs (
  job_id              uuid PRIMARY KEY,
  queue               text NOT NULL,
  task                text NOT NULL,
  args                jsonb NOT NULL DEFAULT '{}'::jsonb,
  idempotency_key     text UNIQUE,
  lock_key            text NOT NULL,
  partition_key       text NOT NULL DEFAULT '',
  priority            int  NOT NULL DEFAULT 100,
  available_at        timestamptz NOT NULL DEFAULT now(),
  status              dl_status   NOT NULL DEFAULT 'queued',
  attempt             int         NOT NULL DEFAULT 0,
  max_attempts        int         NOT NULL DEFAULT 5,
  lease_ttl_sec       int         NOT NULL DEFAULT 60,
  lease_expires_at    timestamptz,
  heartbeat_at        timestamptz,
  cancel_requested    boolean     NOT NULL DEFAULT false,
  progress            jsonb       NOT NULL DEFAULT '{}'::jsonb,
  error               text,
  producer            text,
  consumer_group      text,
  created_at          timestamptz NOT NULL DEFAULT now(),
  started_at          timestamptz,
  finished_at         timestamptz,
  CONSTRAINT dl_jobs_chk_positive CHECK (priority >= 0 AND attempt >= 0 AND max_attempts >= 0 AND lease_ttl_sec > 0)
);

CREATE INDEX ix_dl_jobs_claim ON dl_jobs(queue, available_at, priority, created_at)
  WHERE status = 'queued';

CREATE INDEX ix_dl_jobs_running_lease ON dl_jobs(lease_expires_at)
  WHERE status = 'running';

CREATE INDEX ix_dl_jobs_status_queue ON dl_jobs(status, queue);

CREATE TABLE dl_job_events (
  event_id     bigserial PRIMARY KEY,
  job_id       uuid NOT NULL REFERENCES dl_jobs(job_id) ON DELETE CASCADE,
  queue        text NOT NULL,
  ts           timestamptz NOT NULL DEFAULT now(),
  kind         text NOT NULL,
  payload      jsonb
);

CREATE OR REPLACE FUNCTION notify_job_ready() RETURNS trigger AS $$
BEGIN
  IF (TG_OP = 'INSERT') THEN
    PERFORM pg_notify('dl_jobs', NEW.queue);
    RETURN NEW;
  ELSIF (TG_OP = 'UPDATE') THEN
    IF NEW.status = 'queued' AND NEW.available_at <= now()
       AND (OLD.status IS DISTINCT FROM NEW.status OR OLD.available_at IS DISTINCT FROM NEW.available_at) THEN
      PERFORM pg_notify('dl_jobs', NEW.queue);
    END IF;
    RETURN NEW;
  END IF;
  RETURN NEW;
END $$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS dl_jobs_notify_ins ON dl_jobs;
CREATE TRIGGER dl_jobs_notify_ins
AFTER INSERT ON dl_jobs
FOR EACH ROW EXECUTE FUNCTION notify_job_ready();

DROP TRIGGER IF EXISTS dl_jobs_notify_upd ON dl_jobs;
CREATE TRIGGER dl_jobs_notify_upd
AFTER UPDATE OF status, available_at ON dl_jobs
FOR EACH ROW EXECUTE FUNCTION notify_job_ready();

5) Контракты API (v1)

  • POST /api/v1/jobs/trigger Вход: {queue: str, task: str, args?: dict, idempotency_key?: str, lock_key: str, partition_key?: str, priority?: int, available_at?: RFC3339} Выход: {job_id: UUID, status: str} Поведение: идемпотентная постановка; триггер LISTEN/NOTIFY срабатывает за счёт триггера в БД.

  • GET /api/v1/jobs/{job_id}/status Выход: {job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress: {}}

  • POST /api/v1/jobs/{job_id}/cancel Выход: {… как status …} Поведение: устанавливает cancel_requested = true. Воркер кооперативно завершает задачу между чанками.

Инфраструктурные эндпоинты /health, /status, мидлвар и регистрация роутов - как в шаблоне.


6) Протокол выполнения (воркер)

  1. Claim одной задачи:
WITH cte AS (
  SELECT job_id
  FROM dl_jobs
  WHERE status='queued' AND queue=:queue AND available_at <= now()
  ORDER BY priority ASC, created_at ASC
  FOR UPDATE SKIP LOCKED
  LIMIT 1
)
UPDATE dl_jobs j
SET status='running',
    started_at = COALESCE(started_at, now()),
    attempt = attempt + 1,
    lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec),
    heartbeat_at = now()
FROM cte
WHERE j.job_id = cte.job_id
RETURNING j.job_id, j.task, j.args, j.lock_key, j.partition_key, j.lease_ttl_sec;

Затем SELECT pg_try_advisory_lock(hashtext(:lock_key)). Если false - backoff:

UPDATE dl_jobs
SET status='queued', available_at = now() + make_interval(secs => :sec)
WHERE job_id=:jid;
  1. Heartbeat раз в DL_HEARTBEAT_SEC:
UPDATE dl_jobs
SET heartbeat_at = now(),
    lease_expires_at = now() + make_interval(secs => :ttl)
WHERE job_id = :jid AND status='running';
  1. Завершение:
  • Успех:
UPDATE dl_jobs
SET status='succeeded', finished_at=now(), lease_expires_at=NULL
WHERE job_id=:jid;
  • Ошибка/ретрай:
UPDATE dl_jobs
SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END,
    available_at = CASE WHEN attempt < max_attempts THEN now() + make_interval(secs => 30 * attempt) ELSE now() END,
    error = :err,
    lease_expires_at = NULL,
    finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END
WHERE job_id=:jid;

Всегда выставлять/снимать advisory-lock на lock_key.

  1. Отмена: воркер проверяет cancel_requested между чанками; при true завершает пайплайн (обычно как canceled либо как failed без ретраев - политика проекта).

  2. Reaper (фон у приложения): раз в DL_REAPER_PERIOD_SEC возвращает «потерянные» задачи в очередь.

UPDATE dl_jobs
SET status='queued', available_at=now(), lease_expires_at=NULL
WHERE status='running'
  AND lease_expires_at IS NOT NULL
  AND lease_expires_at < now()
RETURNING job_id;

7) Оптимизация и SLA

  • Claim - O(log N) благодаря частичному индексу ix_dl_jobs_claim.
  • Reaper - O(log N) по индексу ix_dl_jobs_running_lease.
  • /health - без БД; время ответа ≤ 20 мс. /jobs/* - не держат долгих транзакций.
  • Гарантия доставки: at-least-once; операции записи в целевые таблицы - идемпотентны (реализуется в конкретных пайплайнах).
  • Конкуренция: один lock_key одновременно исполняется одним воркером; параллелизм достигается независимыми partition_key.

8) Конфигурация (ENV)

  • DL_DB_DSN - DSN Postgres (async).
  • WORKERS_JSON - JSON-список конфигураций воркеров, напр.: [{"queue":"load.cbr","concurrency":2},{"queue":"load.sgx","concurrency":1}].
  • DL_HEARTBEAT_SEC (деф. 10), DL_DEFAULT_LEASE_TTL_SEC (деф. 60), DL_REAPER_PERIOD_SEC (деф. 10), DL_CLAIM_BACKOFF_SEC (деф. 15).
  • Логирование, middleware, uvicorn_logging_config - из шаблона без изменения контрактов.

9) Эксплуатация и деплой

  • Один контейнер, один Pod, несколько async-воркеров внутри процесса (через WorkerManager).
  • Масштабирование - количеством реплик Deployment: очередь в БД, FOR UPDATE SKIP LOCKED и advisory-lock обеспечат корректность в гонке.
  • Пробы: readiness/liveness на /health из os_router.py.
  • Завершение: на SIGTERM - остановить reaper, подать сигнал воркерам для мягкой остановки, дождаться тасков с таймаутом.

10) Безопасность, аудит, наблюдаемость

  • Структурные логи через logger/* шаблона; маскирование чувствительных полей - как в logger/utils.py.
  • Журнал жизненного цикла в dl_job_events (queued/picked/heartbeat/requeue/done/failed/canceled).
  • Метрики (BETA) - через metric_router.py из шаблона при необходимости.

11) Тест-план

  • Интеграционные тесты v1: постановка → статус → отмена.
  • E2E: постановка → claim мок-воркером → heartbeat → done → статус succeeded.
  • Конкуренция: два воркера на один lock_key → один backoff, один исполняет.
  • Reaper: просроченный lease → возврат в queued.

12) TODO

  • context.py: инициализация engine/sessionmaker, AppContext (как в шаблоне).
  • api/v1/router.py: trigger, status, cancel.
  • api/v1/service.py: бизнес-логика поверх репозитория.
  • storage/repositories.py: SQL для create_or_get, get_status, cancel, requeue_lost.
  • workers/base.py: claim/heartbeat/finish/retry/cancel/advisory-lock.
  • workers/manager.py: парсинг WORKERS_JSON, создание тасков, graceful shutdown.
  • Тесты tests/integration_tests/v1_api/test_service.py по стилю шаблона.
  • Документация .env и примеры WORKERS_JSON.
  • БД-таблицы уже созданы (DDL применён).