14 KiB
Dataloader
Асинхронный сервис постановки и исполнения ETL‑задач поверх очереди в PostgreSQL. Предоставляет HTTP API для триггера задач, мониторинга статуса и отмены. Внутри процесса поднимает пул воркеров, которые конкурируют за задачи через SELECT … FOR UPDATE SKIP LOCKED и обрабатывают их с учётом lease/heartbeat и кооперативной отмены. Для пробуждения воркеров используется LISTEN/NOTIFY.
Содержание
- О проекте
- Быстрый старт
- Конфигурация окружения
- Архитектура и потоки данных
- Структура проекта
- Взаимодействие с БД
- HTTP API
- Воркеры, пайплайны и добавление новых ETL‑задач
- Логирование, метрики, аудит
- Тестирование
- Эксплуатация и масштабирование
О проекте
Сервис решает типовую задачу фоновой обработки задач: один общий пул воркеров, одна очередь в БД, несколько типов задач (пайплайнов), идемпотентность, повторные попытки, контроль конкуренции через advisory‑lock по lock_key, кооперативная отмена, возврат «потерянных» задач.
Быстрый старт
- Установить зависимости (poetry):
poetry install - Подготовить PostgreSQL (см. DDL в
DDL.sql) и переменные окружения (см. «Конфигурация»). - Запуск сервиса:
poetry run python -m dataloader - Проверка доступности:
curl http://localhost:8081/health - Пример постановки задачи:
curl -X POST http://localhost:8081/api/v1/jobs/trigger \ -H "Content-Type: application/json" \ -d '{ "queue": "etl.default", "task": "noop", "args": {"sleep1": 1, "sleep2": 1, "sleep3": 1}, "lock_key": "customer:42", "priority": 100 }'
Конфигурация окружения
Настройки собираются в src/dataloader/config.py через Pydantic Settings.
-
Приложение (
AppSettings):APP_HOST(def:0.0.0.0)APP_PORT(def:8081)TIMEZONE(def:Europe/Moscow)
-
Логирование (
LogSettings):LOG_PATH,LOG_FILE_NAME,LOG_ROTATIONMETRIC_PATH,METRIC_FILE_NAMEAUDIT_LOG_PATH,AUDIT_LOG_FILE_NAMEDEBUGпереключает уровень на DEBUG
-
PostgreSQL (
PGSettings):PG_HOST,PG_PORT,PG_USER,PG_PASSWORD,PG_DATABASEPG_SCHEMA_QUEUE- схема таблиц очереди (логическая, маппится черезschema_translate_map)- Параметры пула:
PG_USE_POOL,PG_POOL_SIZE,PG_MAX_OVERFLOW,PG_POOL_RECYCLE - Таймауты:
PG_CONNECT_TIMEOUT,PG_COMMAND_TIMEOUT
-
Воркеры (
WorkerSettings):WORKERS_JSON- список конфигураций воркеров, например:[{"queue":"etl.default","concurrency":2}]DL_HEARTBEAT_SEC(def: 10)DL_DEFAULT_LEASE_TTL_SEC(def: 60)DL_REAPER_PERIOD_SEC(def: 10)DL_CLAIM_BACKOFF_SEC(def: 15)
Архитектура и потоки данных
- HTTP слой: FastAPI‑приложение (
dataloader.api) с v1 API и инфраструктурными маршрутами (/health,/info). - Контекст:
AppContextинициализирует логирование,AsyncEngine,async_sessionmakerи предоставляет DI (get_session). - Очередь: одна таблица
dl_jobsи журналdl_job_eventsв PostgreSQL. Идемпотентность на уровнеidempotency_key. Пробуждение воркеров через триггеры NOTIFY в БД и listener на стороне приложения. - Воркеры:
WorkerManagerподнимает N асинхронных воркеров (PGWorker) на основанииWORKERS_JSON. Каждый воркер:- ждёт уведомление (LISTEN/NOTIFY) или таймаут,
- пытается «захватить» одну задачу (SELECT … FOR UPDATE SKIP LOCKED),
- выставляет
running, получает advisory‑lock поlock_key, - исполняет соответствующий пайплайн с heartbeat,
- завершает задачу:
succeeded/failed/canceledили возвращает в очередь на ретрай.
- Реапер: фоновая задача, периодически возвращает «потерянные» running‑задачи в
queued.
Структура
src/dataloader/
├── __main__.py # Запуск uvicorn, lifecycle через FastAPI lifespan
├── config.py # Pydantic-настройки (app/log/pg/worker)
├── context.py # AppContext: engine, sessionmaker, логгер, DI
│
├── api/
│ ├── __init__.py # FastAPI app, middleware, routers, lifespan
│ ├── middleware.py # Логирование запросов + метрики/аудит
│ ├── os_router.py # /health, /info (инфраструктурные ручки)
│ ├── metric_router.py # Примеры метрик (like/dislike)
│ │
│ └── v1/
│ ├── router.py # /api/v1/jobs: trigger, status, cancel
│ ├── service.py # Бизнес-логика поверх репозитория
│ ├── schemas.py # Pydantic DTO API
│ └── utils.py # Утилиты (генерация UUID и т.д.)
│
├── storage/
│ ├── engine.py # AsyncEngine, sessionmaker, schema_translate_map
│ ├── notify_listener.py # asyncpg LISTEN/NOTIFY по каналу dl_jobs
│ │
│ ├── models/
│ │ ├── base.py # Declarative base
│ │ └── queue.py # ORM-модели DLJob, DLJobEvent
│ │
│ ├── repositories/
│ │ └── queue.py # QueueRepository (CRUD операции)
│ │
│ └── schemas/
│ └── queue.py # CreateJobRequest, JobStatus
│
├── workers/
│ ├── base.py # PGWorker: главный цикл, heartbeat, вызов пайплайнов
│ ├── manager.py # WorkerManager: запуск/остановка + reaper
│ ├── reaper.py # Requeue_lost на базе репозитория
│ │
│ └── pipelines/
│ ├── __init__.py # Автозагрузка модулей для регистрации
│ ├── registry.py # Реестр обработчиков задач (@register)
│ └── noop.py # Пример эталонного пайплайна
│
└── logger/
└── ... # Логирование
Взаимодействие с БД
- Подключение:
postgresql+asyncpgчерез SQLAlchemy AsyncEngine. - Схемы: логическое имя
queueмаппится на реальную черезschema_translate_map(см.engine.py), имя реальной схемы задаётсяPG_SCHEMA_QUEUE. - DDL: см.
DDL.sql. Ключевые элементы:dl_jobsс индексами на claim и running‑lease,dl_job_eventsкак журнал событий,- триггер
notify_job_ready()+LISTEN dl_jobsдля пробуждения воркеров.
- Конкуренция: claim через
FOR UPDATE SKIP LOCKED, взаимное исключение по бизнес‑сущности через advisory‑lockpg_try_advisory_lock(hashtext(lock_key)). - Надёжность: at‑least‑once. Пайплайны должны быть идемпотентны в части записи в целевые таблицы.
HTTP API (v1)
-
POST
/api/v1/jobs/trigger- Вход:
{queue, task, args?, idempotency_key?, lock_key, partition_key?, priority?, available_at?, max_attempts?, lease_ttl_sec?, producer?, consumer_group?} - Выход:
{job_id: UUID, status: str} - Идемпотентность по
idempotency_key(если задан).
- Вход:
-
GET
/api/v1/jobs/{job_id}/status- Выход:
{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress: {}}
- Выход:
-
POST
/api/v1/jobs/{job_id}/cancel- Выход: как в status
- Поведение: устанавливает
cancel_requested = true; воркер завершает между шагами пайплайна.
Инфраструктурные: /health, /info.
Воркеры, пайплайны и добавление новых ETL‑задач
Как работает воркер
- Ожидает сигнал (LISTEN/NOTIFY) или таймаут
DL_CLAIM_BACKOFF_SEC. - Пытается забрать одну задачу своей очереди:
status='queued' AND available_at<=now()сFOR UPDATE SKIP LOCKED. - Переводит в
running, увеличиваетattempt, выставляетlease_expires_at, делает heartbeat каждыеDL_HEARTBEAT_SEC. - Захватывает advisory‑lock по
lock_key(если не получилось - возвращает вqueuedс бэкоффом). - Выполняет пайплайн (
task) с поддержкой итеративных шагов и кооперативной отмены. - По завершении:
succeededилиfailed/canceled; при ошибках возможны ретраи доmax_attempts.
Интерфейс пайплайна
Пайплайн - обычная функция, возвращающая одно из:
- асинхронный генератор шагов (рекомендуется для длинных процессов),
- корутину,
- синхронную функцию.
Каждый «yield» в асинхронном генераторе - безопасная точка, где воркер выполнит heartbeat и проверит cancel_requested.
Регистрация нового пайплайна через декоратор @register("task_name") в модуле src/dataloader/workers/pipelines/<your_task>.py.
Пример:
from __future__ import annotations
import asyncio
from typing import AsyncIterator
from dataloader.workers.pipelines.registry import register
@register("load.customers")
async def load_customers(args: dict) -> AsyncIterator[None]:
# шаг 1 – вытягиваем данные
await some_fetch(args)
yield
# шаг 2 – пишем в БД идемпотентно (upsert/merge)
await upsert_customers(args)
yield
# шаг 3 – пост‑обработка
await finalize(args)
yield
Важно:
- Пайплайны должны быть идемпотентны (повторный запуск не должен ломать данные).
- Долгие операции разбивайте на шаги с
yield, чтобы работал heartbeat и отмена. - Для бизнес‑взаимного исключения выбирайте корректный
lock_key(например,customer:{id}), чтобы параллельные задачи не конфликтовали.
Добавление ETL‑задачи (шаги)
- Создать модуль в
workers/pipelines/и зарегистрировать обработчик через@register. - Убедиться, что модуль импортируется автоматически (происходит на старте через
load_all()). - При необходимости расширить схему
argsи валидацию на стороне продьюсера (кто вызывает API). - При постановке задачи в
POST /api/v1/jobs/triggerуказатьtaskс новым именем и желаемыеargs. - При необходимости завести отдельную очередь (
queue) и добавить её вWORKERS_JSONс нужнойconcurrency.
Логирование, метрики, аудит
- Логи: структурированные, через
logger/*. Middleware (api/middleware.py) логирует входящие запросы/исходящие ответы, время обработки, пишет метрики и аудит‑события. - Метрики: простые счётчики (пример: likes/dislikes, requests_total, responses_total, duration_ms).
- Аудит: запись бизнес‑событий начала/окончания обработки запроса.
Тестирование
- Юнит‑тесты:
tests/unit/* - Интеграционные тесты:
tests/integration_tests/*- покрывают API, репозиторий очереди, reaper. - Запуск:
poetry run pytest -q
Эксплуатация и масштабирование
- Один процесс FastAPI с пулом асинхронных воркеров внутри. Масштабирование - количеством реплик (Deployment). Очередь в БД и advisory‑lock обеспечат корректность при гонках между репликами.
- Readiness/Liveness -
/health. - Корректное завершение: при SIGTERM менеджер подаёт сигнал остановки, завершает воркеры и reaper, закрывает соединения.
Лицензия
Внутренний корпоративный проект.