# Dataloader Асинхронный сервис постановки и исполнения ETL‑задач поверх очереди в PostgreSQL. Предоставляет HTTP API для триггера задач, мониторинга статуса и отмены. Внутри процесса поднимает пул воркеров, которые конкурируют за задачи через SELECT … FOR UPDATE SKIP LOCKED и обрабатывают их с учётом lease/heartbeat и кооперативной отмены. Для пробуждения воркеров используется LISTEN/NOTIFY. ## Содержание - О проекте - Быстрый старт - Конфигурация окружения - Архитектура и потоки данных - Структура проекта - Взаимодействие с БД - HTTP API - Воркеры, пайплайны и добавление новых ETL‑задач - Логирование, метрики, аудит - Тестирование - Эксплуатация и масштабирование ## О проекте Сервис решает типовую задачу фоновой обработки задач: один общий пул воркеров, одна очередь в БД, несколько типов задач (пайплайнов), идемпотентность, повторные попытки, контроль конкуренции через advisory‑lock по `lock_key`, кооперативная отмена, возврат «потерянных» задач. ## Быстрый старт 1. Установить зависимости (poetry): ```bash poetry install ``` 2. Подготовить PostgreSQL (см. DDL в `DDL.sql`) и переменные окружения (см. «Конфигурация»). 3. Запуск сервиса: ```bash poetry run python -m dataloader ``` 4. Проверка доступности: ```bash curl http://localhost:8081/health ``` 5. Пример постановки задачи: ```bash curl -X POST http://localhost:8081/api/v1/jobs/trigger \ -H "Content-Type: application/json" \ -d '{ "queue": "etl.default", "task": "noop", "args": {"sleep1": 1, "sleep2": 1, "sleep3": 1}, "lock_key": "customer:42", "priority": 100 }' ``` ## Конфигурация окружения Настройки собираются в `src/dataloader/config.py` через Pydantic Settings. - Приложение (`AppSettings`): - `APP_HOST` (def: `0.0.0.0`) - `APP_PORT` (def: `8081`) - `TIMEZONE` (def: `Europe/Moscow`) - Логирование (`LogSettings`): - `LOG_PATH`, `LOG_FILE_NAME`, `LOG_ROTATION` - `METRIC_PATH`, `METRIC_FILE_NAME` - `AUDIT_LOG_PATH`, `AUDIT_LOG_FILE_NAME` - `DEBUG` переключает уровень на DEBUG - PostgreSQL (`PGSettings`): - `PG_HOST`, `PG_PORT`, `PG_USER`, `PG_PASSWORD`, `PG_DATABASE` - `PG_SCHEMA_QUEUE` - схема таблиц очереди (логическая, маппится через `schema_translate_map`) - Параметры пула: `PG_USE_POOL`, `PG_POOL_SIZE`, `PG_MAX_OVERFLOW`, `PG_POOL_RECYCLE` - Таймауты: `PG_CONNECT_TIMEOUT`, `PG_COMMAND_TIMEOUT` - Воркеры (`WorkerSettings`): - `WORKERS_JSON` - список конфигураций воркеров, например: `[{"queue":"etl.default","concurrency":2}]` - `DL_HEARTBEAT_SEC` (def: 10) - `DL_DEFAULT_LEASE_TTL_SEC` (def: 60) - `DL_REAPER_PERIOD_SEC` (def: 10) - `DL_CLAIM_BACKOFF_SEC` (def: 15) ## Архитектура и потоки данных - HTTP слой: FastAPI‑приложение (`dataloader.api`) с v1 API и инфраструктурными маршрутами (`/health`, `/info`). - Контекст: `AppContext` инициализирует логирование, `AsyncEngine`, `async_sessionmaker` и предоставляет DI (`get_session`). - Очередь: одна таблица `dl_jobs` и журнал `dl_job_events` в PostgreSQL. Идемпотентность на уровне `idempotency_key`. Пробуждение воркеров через триггеры NOTIFY в БД и listener на стороне приложения. - Воркеры: `WorkerManager` поднимает N асинхронных воркеров (`PGWorker`) на основании `WORKERS_JSON`. Каждый воркер: 1) ждёт уведомление (LISTEN/NOTIFY) или таймаут, 2) пытается «захватить» одну задачу (SELECT … FOR UPDATE SKIP LOCKED), 3) выставляет `running`, получает advisory‑lock по `lock_key`, 4) исполняет соответствующий пайплайн с heartbeat, 5) завершает задачу: `succeeded`/`failed`/`canceled` или возвращает в очередь на ретрай. - Реапер: фоновая задача, периодически возвращает «потерянные» running‑задачи в `queued`. ## Структура ``` src/dataloader/ ├── __main__.py # Запуск uvicorn, lifecycle через FastAPI lifespan ├── config.py # Pydantic-настройки (app/log/pg/worker) ├── context.py # AppContext: engine, sessionmaker, логгер, DI │ ├── api/ │ ├── __init__.py # FastAPI app, middleware, routers, lifespan │ ├── middleware.py # Логирование запросов + метрики/аудит │ ├── os_router.py # /health, /info (инфраструктурные ручки) │ ├── metric_router.py # Примеры метрик (like/dislike) │ │ │ └── v1/ │ ├── router.py # /api/v1/jobs: trigger, status, cancel │ ├── service.py # Бизнес-логика поверх репозитория │ ├── schemas.py # Pydantic DTO API │ └── utils.py # Утилиты (генерация UUID и т.д.) │ ├── storage/ │ ├── engine.py # AsyncEngine, sessionmaker, schema_translate_map │ ├── notify_listener.py # asyncpg LISTEN/NOTIFY по каналу dl_jobs │ │ │ ├── models/ │ │ ├── base.py # Declarative base │ │ └── queue.py # ORM-модели DLJob, DLJobEvent │ │ │ ├── repositories/ │ │ └── queue.py # QueueRepository (CRUD операции) │ │ │ └── schemas/ │ └── queue.py # CreateJobRequest, JobStatus │ ├── workers/ │ ├── base.py # PGWorker: главный цикл, heartbeat, вызов пайплайнов │ ├── manager.py # WorkerManager: запуск/остановка + reaper │ ├── reaper.py # Requeue_lost на базе репозитория │ │ │ └── pipelines/ │ ├── __init__.py # Автозагрузка модулей для регистрации │ ├── registry.py # Реестр обработчиков задач (@register) │ └── noop.py # Пример эталонного пайплайна │ └── logger/ └── ... # Логирование ``` ## Взаимодействие с БД - Подключение: `postgresql+asyncpg` через SQLAlchemy AsyncEngine. - Схемы: логическое имя `queue` маппится на реальную через `schema_translate_map` (см. `engine.py`), имя реальной схемы задаётся `PG_SCHEMA_QUEUE`. - DDL: см. `DDL.sql`. Ключевые элементы: - `dl_jobs` с индексами на claim и running‑lease, - `dl_job_events` как журнал событий, - триггер `notify_job_ready()` + `LISTEN dl_jobs` для пробуждения воркеров. - Конкуренция: claim через `FOR UPDATE SKIP LOCKED`, взаимное исключение по бизнес‑сущности через advisory‑lock `pg_try_advisory_lock(hashtext(lock_key))`. - Надёжность: at‑least‑once. Пайплайны должны быть идемпотентны в части записи в целевые таблицы. ## HTTP API (v1) - POST `/api/v1/jobs/trigger` - Вход: `{queue, task, args?, idempotency_key?, lock_key, partition_key?, priority?, available_at?, max_attempts?, lease_ttl_sec?, producer?, consumer_group?}` - Выход: `{job_id: UUID, status: str}` - Идемпотентность по `idempotency_key` (если задан). - GET `/api/v1/jobs/{job_id}/status` - Выход: `{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress: {}}` - POST `/api/v1/jobs/{job_id}/cancel` - Выход: как в status - Поведение: устанавливает `cancel_requested = true`; воркер завершает между шагами пайплайна. Инфраструктурные: `/health`, `/info`. ## Воркеры, пайплайны и добавление новых ETL‑задач ### Как работает воркер 1. Ожидает сигнал (LISTEN/NOTIFY) или таймаут `DL_CLAIM_BACKOFF_SEC`. 2. Пытается забрать одну задачу своей очереди: `status='queued' AND available_at<=now()` с `FOR UPDATE SKIP LOCKED`. 3. Переводит в `running`, увеличивает `attempt`, выставляет `lease_expires_at`, делает heartbeat каждые `DL_HEARTBEAT_SEC`. 4. Захватывает advisory‑lock по `lock_key` (если не получилось - возвращает в `queued` с бэкоффом). 5. Выполняет пайплайн (`task`) с поддержкой итеративных шагов и кооперативной отмены. 6. По завершении: `succeeded` или `failed`/`canceled`; при ошибках возможны ретраи до `max_attempts`. ### Интерфейс пайплайна Пайплайн - обычная функция, возвращающая одно из: - асинхронный генератор шагов (рекомендуется для длинных процессов), - корутину, - синхронную функцию. Каждый «yield» в асинхронном генераторе - безопасная точка, где воркер выполнит heartbeat и проверит `cancel_requested`. Регистрация нового пайплайна через декоратор `@register("task_name")` в модуле `src/dataloader/workers/pipelines/.py`. Пример: ```python from __future__ import annotations import asyncio from typing import AsyncIterator from dataloader.workers.pipelines.registry import register @register("load.customers") async def load_customers(args: dict) -> AsyncIterator[None]: # шаг 1 – вытягиваем данные await some_fetch(args) yield # шаг 2 – пишем в БД идемпотентно (upsert/merge) await upsert_customers(args) yield # шаг 3 – пост‑обработка await finalize(args) yield ``` Важно: - Пайплайны должны быть идемпотентны (повторный запуск не должен ломать данные). - Долгие операции разбивайте на шаги с `yield`, чтобы работал heartbeat и отмена. - Для бизнес‑взаимного исключения выбирайте корректный `lock_key` (например, `customer:{id}`), чтобы параллельные задачи не конфликтовали. ### Добавление ETL‑задачи (шаги) 1. Создать модуль в `workers/pipelines/` и зарегистрировать обработчик через `@register`. 2. Убедиться, что модуль импортируется автоматически (происходит на старте через `load_all()`). 3. При необходимости расширить схему `args` и валидацию на стороне продьюсера (кто вызывает API). 4. При постановке задачи в `POST /api/v1/jobs/trigger` указать `task` с новым именем и желаемые `args`. 5. При необходимости завести отдельную очередь (`queue`) и добавить её в `WORKERS_JSON` с нужной `concurrency`. ## Логирование, метрики, аудит - Логи: структурированные, через `logger/*`. Middleware (`api/middleware.py`) логирует входящие запросы/исходящие ответы, время обработки, пишет метрики и аудит‑события. - Метрики: простые счётчики (пример: likes/dislikes, requests_total, responses_total, duration_ms). - Аудит: запись бизнес‑событий начала/окончания обработки запроса. ## Тестирование - Юнит‑тесты: `tests/unit/*` - Интеграционные тесты: `tests/integration_tests/*` - покрывают API, репозиторий очереди, reaper. - Запуск: ```bash poetry run pytest -q ``` ## Эксплуатация и масштабирование - Один процесс FastAPI с пулом асинхронных воркеров внутри. Масштабирование - количеством реплик (Deployment). Очередь в БД и advisory‑lock обеспечат корректность при гонках между репликами. - Readiness/Liveness - `/health`. - Корректное завершение: при SIGTERM менеджер подаёт сигнал остановки, завершает воркеры и reaper, закрывает соединения. ## Лицензия Внутренний корпоративный проект.