# Dataloader Асинхронный сервис постановки и исполнения ETL‑задач поверх очереди в PostgreSQL. Предоставляет HTTP API для триггера задач, мониторинга статуса и отмены. Внутри процесса поднимает пул воркеров, которые конкурируют за задачи через SELECT … FOR UPDATE SKIP LOCKED и обрабатывают их с учётом lease/heartbeat и кооперативной отмены. Для пробуждения воркеров используется LISTEN/NOTIFY. ## Содержание - О проекте - Быстрый старт - Конфигурация окружения - Архитектура и потоки данных - Структура проекта - Взаимодействие с БД - HTTP API - Воркеры, пайплайны и добавление новых ETL‑задач - Логирование, метрики, аудит - Тестирование - Эксплуатация и масштабирование ## О проекте Сервис решает типовую задачу фоновой обработки задач: один общий пул воркеров, одна очередь в БД, несколько типов задач (пайплайнов), идемпотентность, повторные попытки, контроль конкуренции через advisory‑lock по `lock_key`, кооперативная отмена, возврат «потерянных» задач. ## Быстрый старт 1. Установить зависимости (poetry): ```bash poetry install ``` 2. Подготовить PostgreSQL (см. DDL в `DDL.sql`) и переменные окружения (см. «Конфигурация»). 3. Запуск сервиса: ```bash poetry run python -m dataloader ``` 4. Проверка доступности: ```bash curl http://localhost:8081/health ``` 5. Пример постановки задачи: ```bash curl -X POST http://localhost:8081/api/v1/jobs/trigger \ -H "Content-Type: application/json" \ -d '{ "queue": "etl.default", "task": "noop", "args": {"sleep1": 1, "sleep2": 1, "sleep3": 1}, "lock_key": "customer:42", "priority": 100 }' ``` ## Конфигурация окружения Настройки собираются в `src/dataloader/config.py` через Pydantic Settings. - Приложение (`AppSettings`): - `APP_HOST` (def: `0.0.0.0`) - `APP_PORT` (def: `8081`) - `TIMEZONE` (def: `Europe/Moscow`) - Логирование (`LogSettings`): - `LOG_PATH`, `LOG_FILE_NAME`, `LOG_ROTATION` - `METRIC_PATH`, `METRIC_FILE_NAME` - `AUDIT_LOG_PATH`, `AUDIT_LOG_FILE_NAME` - `DEBUG` переключает уровень на DEBUG - PostgreSQL (`PGSettings`): - `PG_HOST`, `PG_PORT`, `PG_USER`, `PG_PASSWORD`, `PG_DATABASE` - `PG_SCHEMA_QUEUE` - схема таблиц очереди (логическая, маппится через `schema_translate_map`) - Параметры пула: `PG_USE_POOL`, `PG_POOL_SIZE`, `PG_MAX_OVERFLOW`, `PG_POOL_RECYCLE` - Таймауты: `PG_CONNECT_TIMEOUT`, `PG_COMMAND_TIMEOUT` - Воркеры (`WorkerSettings`): - `WORKERS_JSON` - список конфигураций воркеров, например: `[{"queue":"etl.default","concurrency":2}]` - `DL_HEARTBEAT_SEC` (def: 10) - `DL_DEFAULT_LEASE_TTL_SEC` (def: 60) - `DL_REAPER_PERIOD_SEC` (def: 10) - `DL_CLAIM_BACKOFF_SEC` (def: 15) ## Архитектура и потоки данных - HTTP слой: FastAPI‑приложение (`dataloader.api`) с v1 API и инфраструктурными маршрутами (`/health`, `/info`). - Контекст: `AppContext` инициализирует логирование, `AsyncEngine`, `async_sessionmaker` и предоставляет DI (`get_session`). - Очередь: одна таблица `dl_jobs` и журнал `dl_job_events` в PostgreSQL. Идемпотентность на уровне `idempotency_key`. Пробуждение воркеров через триггеры NOTIFY в БД и listener на стороне приложения. - Воркеры: `WorkerManager` поднимает N асинхронных воркеров (`PGWorker`) на основании `WORKERS_JSON`. Каждый воркер: 1) ждёт уведомление (LISTEN/NOTIFY) или таймаут, 2) пытается «захватить» одну задачу (SELECT … FOR UPDATE SKIP LOCKED), 3) выставляет `running`, получает advisory‑lock по `lock_key`, 4) исполняет соответствующий пайплайн с heartbeat, 5) завершает задачу: `succeeded`/`failed`/`canceled` или возвращает в очередь на ретрай. - Реапер: фоновая задача, периодически возвращает «потерянные» running‑задачи в `queued`. ## Структура ``` src/dataloader/ ├── __main__.py # Запуск uvicorn, lifecycle через FastAPI lifespan ├── config.py # Pydantic-настройки (app/log/pg/worker) ├── context.py # AppContext: engine, sessionmaker, логгер, DI │ ├── api/ │ ├── __init__.py # FastAPI app, middleware, routers, lifespan │ ├── middleware.py # Логирование запросов + метрики/аудит │ ├── os_router.py # /health, /info (инфраструктурные ручки) │ ├── metric_router.py # Примеры метрик (like/dislike) │ │ │ └── v1/ │ ├── router.py # /api/v1/jobs: trigger, status, cancel │ ├── service.py # Бизнес-логика поверх репозитория │ ├── schemas.py # Pydantic DTO API │ └── utils.py # Утилиты (генерация UUID и т.д.) │ ├── storage/ │ ├── engine.py # AsyncEngine, sessionmaker, schema_translate_map │ ├── notify_listener.py # asyncpg LISTEN/NOTIFY по каналу dl_jobs │ │ │ ├── models/ │ │ ├── base.py # Declarative base │ │ └── queue.py # ORM-модели DLJob, DLJobEvent │ │ │ ├── repositories/ │ │ └── queue.py # QueueRepository (CRUD операции) │ │ │ └── schemas/ │ └── queue.py # CreateJobRequest, JobStatus │ ├── workers/ │ ├── base.py # PGWorker: главный цикл, heartbeat, вызов пайплайнов │ ├── manager.py # WorkerManager: запуск/остановка + reaper │ ├── reaper.py # Requeue_lost на базе репозитория │ │ │ └── pipelines/ │ ├── __init__.py # Автозагрузка модулей для регистрации │ ├── registry.py # Реестр обработчиков задач (@register) │ └── noop.py # Пример эталонного пайплайна │ └── logger/ └── ... # Логирование ``` ## Взаимодействие с БД - Подключение: `postgresql+asyncpg` через SQLAlchemy AsyncEngine. - Схемы: логическое имя `queue` маппится на реальную через `schema_translate_map` (см. `engine.py`), имя реальной схемы задаётся `PG_SCHEMA_QUEUE`. - DDL: см. `DDL.sql`. Ключевые элементы: - `dl_jobs` с индексами на claim и running‑lease, - `dl_job_events` как журнал событий, - триггер `notify_job_ready()` + `LISTEN dl_jobs` для пробуждения воркеров. - Конкуренция: claim через `FOR UPDATE SKIP LOCKED`, взаимное исключение по бизнес‑сущности через advisory‑lock `pg_try_advisory_lock(hashtext(lock_key))`. - Надёжность: at‑least‑once. Пайплайны должны быть идемпотентны в части записи в целевые таблицы. ## HTTP API (v1) ### POST `/api/v1/jobs/trigger` Постановка задачи в очередь (идемпотентная операция). **Request:** ```json { "queue": "load.cbr", // обязательно: имя очереди "task": "load.cbr.rates", // обязательно: имя задачи для registry "args": { // опционально: аргументы задачи "date": "2025-01-10", "currencies": ["USD", "EUR"] }, "idempotency_key": "cbr_2025-01-10", // опционально: ключ идемпотентности "lock_key": "cbr_rates", // обязательно: ключ для advisory lock "partition_key": "2025-01-10", // опционально: ключ партиционирования "priority": 100, // опционально: приоритет (меньше = выше) "available_at": "2025-01-10T00:00:00Z", // опционально: отложенный запуск "max_attempts": 3, // опционально: макс попыток (def: 5) "lease_ttl_sec": 300, // опционально: TTL аренды (def: 60) "producer": "api-client", // опционально: кто поставил "consumer_group": "cbr-loaders" // опционально: группа потребителей } ``` **Response 200:** ```json { "job_id": "550e8400-e29b-41d4-a716-446655440000", "status": "queued" } ``` **Коды ответов:** - `200 OK` - задача создана или уже существует (идемпотентность) - `400 Bad Request` - невалидные данные - `500 Internal Server Error` - ошибка сервера ### GET `/api/v1/jobs/{job_id}/status` Получение статуса задачи. **Response 200:** ```json { "job_id": "550e8400-e29b-41d4-a716-446655440000", "status": "running", // queued/running/succeeded/failed/canceled "attempt": 1, // текущая попытка "started_at": "2025-01-10T12:00:00Z", // время первого запуска "finished_at": null, // время завершения (если есть) "heartbeat_at": "2025-01-10T12:01:30Z", // последний heartbeat "error": null, // текст ошибки (если есть) "progress": { // прогресс выполнения (custom) "processed": 500, "total": 1000 } } ``` **Коды ответов:** - `200 OK` - статус получен - `404 Not Found` - задача не найдена ### POST `/api/v1/jobs/{job_id}/cancel` Запрос кооперативной отмены задачи. **Response 200:** ```json { "job_id": "550e8400-e29b-41d4-a716-446655440000", "status": "running", "attempt": 1, "started_at": "2025-01-10T12:00:00Z", "heartbeat_at": "2025-01-10T12:01:30Z" } ``` **Поведение:** - Устанавливает флаг `cancel_requested = true` в БД - Воркер проверяет флаг между `yield` в пайплайне - При обнаружении флага воркер завершает задачу со статусом `canceled` **Коды ответов:** - `200 OK` - запрос отмены принят - `404 Not Found` - задача не найдена ### Инфраструктурные эндпоинты **GET `/health`** - проверка работоспособности (без БД, < 20ms) ```json {"status": "healthy"} ``` **GET `/info`** - информация о сервисе ```json { "service": "dataloader", "version": "1.0.0", "environment": "production" } ``` ## Воркеры, пайплайны и добавление новых ETL‑задач ### Как работает воркер 1. Ожидает сигнал (LISTEN/NOTIFY) или таймаут `DL_CLAIM_BACKOFF_SEC`. 2. Пытается забрать одну задачу своей очереди: `status='queued' AND available_at<=now()` с `FOR UPDATE SKIP LOCKED`. 3. Переводит в `running`, увеличивает `attempt`, выставляет `lease_expires_at`, делает heartbeat каждые `DL_HEARTBEAT_SEC`. 4. Захватывает advisory‑lock по `lock_key` (если не получилось - возвращает в `queued` с бэкоффом). 5. Выполняет пайплайн (`task`) с поддержкой итеративных шагов и кооперативной отмены. 6. По завершении: `succeeded` или `failed`/`canceled`; при ошибках возможны ретраи до `max_attempts`. ### Протокол выполнения задачи (SQL) **Claim (захват задачи):** ```sql WITH cte AS ( SELECT job_id FROM dl_jobs WHERE status = 'queued' AND queue = :queue AND available_at <= now() ORDER BY priority ASC, created_at ASC FOR UPDATE SKIP LOCKED LIMIT 1 ) UPDATE dl_jobs j SET status = 'running', started_at = COALESCE(started_at, now()), attempt = attempt + 1, lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec), heartbeat_at = now() FROM cte WHERE j.job_id = cte.job_id RETURNING j.job_id, j.task, j.args, j.lock_key, j.lease_ttl_sec; ``` **Heartbeat (продление аренды):** ```sql UPDATE dl_jobs SET heartbeat_at = now(), lease_expires_at = now() + make_interval(secs => :ttl) WHERE job_id = :job_id AND status = 'running' RETURNING cancel_requested; ``` **Завершение успешное:** ```sql UPDATE dl_jobs SET status = 'succeeded', finished_at = now(), lease_expires_at = NULL WHERE job_id = :job_id; SELECT pg_advisory_unlock(hashtext(:lock_key)); ``` **Завершение с ошибкой (retry):** ```sql UPDATE dl_jobs SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END, available_at = CASE WHEN attempt < max_attempts THEN now() + make_interval(secs => 30 * attempt) ELSE now() END, error = :error_message, lease_expires_at = NULL, finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END WHERE job_id = :job_id; SELECT pg_advisory_unlock(hashtext(:lock_key)); ``` **Reaper (возврат потерянных задач):** ```sql UPDATE dl_jobs SET status = 'queued', available_at = now(), lease_expires_at = NULL WHERE status = 'running' AND lease_expires_at IS NOT NULL AND lease_expires_at < now() RETURNING job_id; ``` ### Интерфейс пайплайна Пайплайн - обычная функция, возвращающая одно из: - асинхронный генератор шагов (рекомендуется для длинных процессов), - корутину, - синхронную функцию. Каждый «yield» в асинхронном генераторе - безопасная точка, где воркер выполнит heartbeat и проверит `cancel_requested`. Регистрация нового пайплайна через декоратор `@register("task_name")` в модуле `src/dataloader/workers/pipelines/.py`. Пример: ```python from __future__ import annotations import asyncio from typing import AsyncIterator from dataloader.workers.pipelines.registry import register @register("load.customers") async def load_customers(args: dict) -> AsyncIterator[None]: # шаг 1 – вытягиваем данные await some_fetch(args) yield # шаг 2 – пишем в БД идемпотентно (upsert/merge) await upsert_customers(args) yield # шаг 3 – пост‑обработка await finalize(args) yield ``` Важно: - Пайплайны должны быть идемпотентны (повторный запуск не должен ломать данные). - Долгие операции разбивайте на шаги с `yield`, чтобы работал heartbeat и отмена. - Для бизнес‑взаимного исключения выбирайте корректный `lock_key` (например, `customer:{id}`), чтобы параллельные задачи не конфликтовали. ### Добавление ETL‑задачи (шаги) **1. Создать пайплайн** в `src/dataloader/workers/pipelines/`: ```python # src/dataloader/workers/pipelines/load_cbr_rates.py from __future__ import annotations from typing import AsyncIterator from datetime import datetime from dataloader.workers.pipelines.registry import register @register("load.cbr.rates") async def load_cbr_rates(args: dict) -> AsyncIterator[None]: """ Загрузка курсов валют ЦБ РФ. Args: args: {"date": "YYYY-MM-DD", "currencies": ["USD", "EUR"]} """ date = datetime.fromisoformat(args["date"]) currencies = args.get("currencies", ["USD", "EUR"]) # Извлечение данных data = await fetch_cbr_rates(date, currencies) yield # Heartbeat checkpoint # Трансформация transformed = transform_rates(data) yield # Heartbeat checkpoint # Идемпотентная загрузка в БД async with get_target_session() as session: await session.execute( insert(CbrRates) .values(transformed) .on_conflict_do_update( index_elements=["date", "currency"], set_={"rate": excluded.c.rate, "updated_at": func.now()} ) ) await session.commit() yield ``` **2. Настроить воркеры** в `.env`: ```bash WORKERS_JSON='[{"queue":"load.cbr","concurrency":2}]' ``` **3. Поставить задачу через API**: ```bash curl -X POST http://localhost:8081/api/v1/jobs/trigger \ -H "Content-Type: application/json" \ -d '{ "queue": "load.cbr", "task": "load.cbr.rates", "args": {"date": "2025-01-10", "currencies": ["USD", "EUR"]}, "lock_key": "cbr_rates_2025-01-10", "partition_key": "2025-01-10", "priority": 100, "max_attempts": 3, "lease_ttl_sec": 300 }' ``` **4. Мониторить выполнение**: ```bash # Получить job_id из ответа и проверить статус curl http://localhost:8081/api/v1/jobs/{job_id}/status ``` **Ключевые моменты**: - Пайплайн должен быть идемпотентным (повторный запуск не должен ломать данные) - Используйте `yield` после каждого значимого чанка работы для heartbeat - `lock_key` должен обеспечивать взаимное исключение (например, `customer:{id}`) - `partition_key` используется для параллелизации независимых задач ## Логирование, метрики, аудит - Логи: структурированные, через `logger/*`. Middleware (`api/middleware.py`) логирует входящие запросы/исходящие ответы, время обработки, пишет метрики и аудит‑события. - Метрики: простые счётчики (пример: likes/dislikes, requests_total, responses_total, duration_ms). - Аудит: запись бизнес‑событий начала/окончания обработки запроса. ## Тестирование ### Структура тестов ``` tests/ ├── conftest.py # Глобальные фикстуры (db_engine, db_session, client) ├── integration_tests/ # Интеграционные тесты с реальной БД │ ├── test_queue_repository.py # 12 тестов репозитория │ └── test_api_endpoints.py # 7 тестов API endpoints └── unit/ # Юнит-тесты с моками (92 теста) ├── test_config.py # 30 тестов конфигурации ├── test_context.py # 13 тестов AppContext ├── test_api_service.py # 10 тестов сервисного слоя ├── test_notify_listener.py # 13 тестов LISTEN/NOTIFY ├── test_workers_base.py # 14 тестов PGWorker ├── test_workers_manager.py # 10 тестов WorkerManager └── test_pipeline_registry.py # 5 тестов реестра пайплайнов ``` ### Запуск тестов ```bash # Все тесты (111 тестов) poetry run pytest # Только юнит-тесты poetry run pytest tests/unit/ -m unit # Только интеграционные poetry run pytest tests/integration_tests/ -m integration # С покрытием кода poetry run pytest --cov=dataloader --cov-report=html # С подробным выводом poetry run pytest -v -s ``` ### Покрытие кода Текущее покрытие: **91%** (788 строк / 715 покрыто) ``` Name Stmts Miss Cover --------------------------------------------------------------- src/dataloader/config.py 79 0 100% src/dataloader/context.py 39 0 100% src/dataloader/api/v1/service.py 32 0 100% src/dataloader/storage/models/queue.py 43 0 100% src/dataloader/storage/schemas/queue.py 29 0 100% src/dataloader/storage/notify_listener.py 49 0 100% src/dataloader/workers/base.py 102 3 97% src/dataloader/workers/manager.py 64 0 100% src/dataloader/storage/repositories/queue 130 12 91% --------------------------------------------------------------- TOTAL 788 73 91% ``` ### Ключевые тест-сценарии **Интеграционные тесты:** - Постановка задачи через API → проверка статуса - Идемпотентность через `idempotency_key` - Claim задачи → heartbeat → успешное завершение - Claim задачи → ошибка → retry → финальный fail - Конкуренция воркеров через advisory lock - Возврат потерянных задач (reaper) - Отмена задачи пользователем **Юнит-тесты:** - Конфигурация из переменных окружения - Создание и управление воркерами - LISTEN/NOTIFY механизм - Сервисный слой и репозиторий - Протокол heartbeat и отмены ### Масштабирование - **Вертикальное**: Увеличение `concurrency` в `WORKERS_JSON` для существующих воркеров - **Горизонтальное**: Увеличение количества реплик (pods). Очередь в БД и advisory-lock обеспечат корректность при конкуренции между репликами - **По очередям**: Разные deployment'ы для разных очередей с разными ресурсами ### Graceful Shutdown При получении SIGTERM: 1. Останавливает прием новых HTTP запросов 2. Сигнализирует воркерам о необходимости завершения 3. Ждет завершения текущих задач (timeout 30 сек) 4. Останавливает reaper 5. Закрывает соединения с БД ### Мониторинг **Health Checks:** - `GET /health` - проверка работоспособности (без БД, < 20ms) - `GET /info` - информация о версии **Метрики (если включен metric_router):** - Количество задач по статусам (queued, running, succeeded, failed) - Время выполнения задач (p50, p95, p99) - Количество активных воркеров - Частота ошибок **Логи:** Структурированные JSON-логи с уровнями: DEBUG, INFO, WARNING, ERROR, CRITICAL **Ключевые события для алертов:** - `worker.claim.backoff` - частые backoff'ы (возможна конкуренция) - `worker.complete.failed` - высокий процент ошибок - `reaper.requeued` - частый возврат потерянных задач (проблемы с lease) - `api.error` - ошибки API ## Troubleshooting ### Задачи застревают в статусе `queued` **Симптомы:** Задачи не начинают выполняться, остаются в `queued`. **Возможные причины:** 1. Воркеры не запущены или упали 2. Нет воркеров для данной очереди в `WORKERS_JSON` 3. `available_at` в будущем **Решение:** ```bash # Проверить логи воркеров docker logs dataloader | grep worker # Проверить конфигурацию echo $WORKERS_JSON # Проверить задачи в БД SELECT job_id, queue, status, available_at, created_at FROM dl_jobs WHERE status = 'queued' ORDER BY created_at DESC LIMIT 10; ``` ### Задачи часто возвращаются в `queued` (backoff) **Симптомы:** В логах частые события `worker.claim.backoff`. **Причины:** - Конкуренция за `lock_key`: несколько задач с одинаковым `lock_key` одновременно - Advisory lock уже занят другим процессом **Решение:** - Проверить корректность выбора `lock_key` (должен быть уникальным для бизнес-сущности) - Использовать `partition_key` для распределения нагрузки - Снизить `concurrency` для данной очереди ### Высокий процент `failed` задач **Симптомы:** Много задач завершаются с `status = 'failed'`. **Диагностика:** ```sql SELECT job_id, task, error, attempt, max_attempts FROM dl_jobs WHERE status = 'failed' ORDER BY finished_at DESC LIMIT 20; ``` **Возможные причины:** - Ошибки в коде пайплайна - Недоступность внешних сервисов - Таймауты (превышение `lease_ttl_sec`) - Неверные аргументы в `args` **Решение:** - Проверить логи с `job_id` - Увеличить `max_attempts` для retry - Увеличить `lease_ttl_sec` для долгих операций - Исправить код пайплайна ### Медленное выполнение задач **Симптомы:** Задачи выполняются дольше ожидаемого. **Диагностика:** ```sql SELECT task, AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_duration_sec, COUNT(*) as total FROM dl_jobs WHERE status IN ('succeeded', 'failed') AND finished_at > NOW() - INTERVAL '1 hour' GROUP BY task ORDER BY avg_duration_sec DESC; ``` **Возможные причины:** - Неоптимальный код пайплайна - Медленные внешние сервисы - Недостаточно воркеров (`concurrency` слишком мал) - Проблемы с БД (медленные запросы, блокировки) **Решение:** - Профилировать код пайплайна - Увеличить `concurrency` в `WORKERS_JSON` - Оптимизировать запросы к БД (индексы, batching) - Масштабировать горизонтально (больше реплик) ### Утечка памяти **Симптомы:** Постепенный рост потребления памяти, OOM kills. **Диагностика:** ```bash # Мониторинг памяти kubectl top pods -l app=dataloader --containers # Проверить логи перед падением kubectl logs dataloader-xxx --previous ``` **Возможные причины:** - Накопление объектов в памяти в пайплайне - Незакрытые соединения/файлы - Утечки в зависимостях **Решение:** - Использовать context managers (`async with`) для ресурсов - Обрабатывать данные чанками, не загружать всё в память - Периодически перезапускать воркеры (restart policy) ### Проблемы с LISTEN/NOTIFY **Симптомы:** Воркеры не просыпаются сразу после постановки задачи. **Диагностика:** ```bash # Проверить логи listener docker logs dataloader | grep "notify_listener\|LISTEN" # Проверить триггеры в БД SELECT * FROM pg_trigger WHERE tgname LIKE 'dl_jobs%'; ``` **Возможные причины:** - Триггеры не созданы или отключены - Проблемы с подключением asyncpg - Воркер не подписан на канал **Решение:** - Воркер автоматически fallback'ится на polling при проблемах с LISTEN/NOTIFY - Проверить DDL: триггеры `dl_jobs_notify_ins` и `dl_jobs_notify_upd` - Проверить права пользователя БД на LISTEN/NOTIFY