36 KiB
Dataloader
Асинхронный сервис постановки и исполнения ETL‑задач поверх очереди в PostgreSQL. Предоставляет HTTP API для триггера задач, мониторинга статуса и отмены. Внутри процесса поднимает пул воркеров, которые конкурируют за задачи через SELECT … FOR UPDATE SKIP LOCKED и обрабатывают их с учётом lease/heartbeat и кооперативной отмены. Для пробуждения воркеров используется LISTEN/NOTIFY.
Содержание
- О проекте
- Быстрый старт
- Конфигурация окружения
- Архитектура и потоки данных
- Структура проекта
- Взаимодействие с БД
- HTTP API
- Воркеры, пайплайны и добавление новых ETL‑задач
- Существующие пайплайны
- Логирование, метрики, аудит
- Тестирование
- Эксплуатация и масштабирование
О проекте
Сервис решает типовую задачу фоновой обработки задач: один общий пул воркеров, одна очередь в БД, несколько типов задач (пайплайнов), идемпотентность, повторные попытки, контроль конкуренции через advisory‑lock по lock_key, кооперативная отмена, возврат «потерянных» задач.
Быстрый старт
- Установить зависимости (poetry):
poetry install - Подготовить PostgreSQL (см. DDL в
DDL.sql) и переменные окружения (см. «Конфигурация»). - Запуск сервиса:
poetry run python -m dataloader - Проверка доступности:
curl http://localhost:8081/health - Пример постановки задачи:
curl -X POST http://localhost:8081/api/v1/jobs/trigger \ -H "Content-Type: application/json" \ -d '{ "queue": "etl.default", "task": "noop", "args": {"sleep1": 1, "sleep2": 1, "sleep3": 1}, "lock_key": "customer:42", "priority": 100 }'
Конфигурация окружения
Настройки собираются в src/dataloader/config.py через Pydantic Settings.
-
Приложение (
AppSettings):APP_HOST(def:0.0.0.0)APP_PORT(def:8081)TIMEZONE(def:Europe/Moscow)
-
Логирование (
LogSettings):LOG_PATH,LOG_FILE_NAME,LOG_ROTATIONMETRIC_PATH,METRIC_FILE_NAMEAUDIT_LOG_PATH,AUDIT_LOG_FILE_NAMEDEBUGпереключает уровень на DEBUG
-
PostgreSQL (
PGSettings):PG_HOST,PG_PORT,PG_USER,PG_PASSWORD,PG_DATABASEPG_SCHEMA_QUEUE- схема таблиц очереди (логическая, маппится черезschema_translate_map)- Параметры пула:
PG_USE_POOL,PG_POOL_SIZE,PG_MAX_OVERFLOW,PG_POOL_RECYCLE - Таймауты:
PG_CONNECT_TIMEOUT,PG_COMMAND_TIMEOUT
-
Воркеры (
WorkerSettings):WORKERS_JSON- список конфигураций воркеров, например:[{"queue":"etl.default","concurrency":2}]DL_HEARTBEAT_SEC(def: 10)DL_DEFAULT_LEASE_TTL_SEC(def: 60)DL_REAPER_PERIOD_SEC(def: 10)DL_CLAIM_BACKOFF_SEC(def: 15)
Архитектура и потоки данных
- HTTP слой: FastAPI‑приложение (
dataloader.api) с v1 API и инфраструктурными маршрутами (/health,/info). - Контекст:
AppContextинициализирует логирование,AsyncEngine,async_sessionmakerи предоставляет DI (get_session). - Очередь: одна таблица
dl_jobsи журналdl_job_eventsв PostgreSQL. Идемпотентность на уровнеidempotency_key. Пробуждение воркеров через триггеры NOTIFY в БД и listener на стороне приложения. - Воркеры:
WorkerManagerподнимает N асинхронных воркеров (PGWorker) на основанииWORKERS_JSON. Каждый воркер:- ждёт уведомление (LISTEN/NOTIFY) или таймаут,
- пытается «захватить» одну задачу (SELECT … FOR UPDATE SKIP LOCKED),
- выставляет
running, получает advisory‑lock поlock_key, - исполняет соответствующий пайплайн с heartbeat,
- завершает задачу:
succeeded/failed/canceledили возвращает в очередь на ретрай.
- Реапер: фоновая задача, периодически возвращает «потерянные» running‑задачи в
queued.
Структура
src/dataloader/
├── __main__.py # Запуск uvicorn, lifecycle через FastAPI lifespan
├── config.py # Pydantic-настройки (app/log/pg/worker)
├── context.py # AppContext: engine, sessionmaker, логгер, DI
│
├── api/
│ ├── __init__.py # FastAPI app, middleware, routers, lifespan
│ ├── middleware.py # Логирование запросов + метрики/аудит
│ ├── os_router.py # /health, /info (инфраструктурные ручки)
│ ├── metric_router.py # Примеры метрик (like/dislike)
│ │
│ └── v1/
│ ├── router.py # /api/v1/jobs: trigger, status, cancel
│ ├── service.py # Бизнес-логика поверх репозитория
│ ├── schemas.py # Pydantic DTO API
│ └── utils.py # Утилиты (генерация UUID и т.д.)
│
├── storage/
│ ├── engine.py # AsyncEngine, sessionmaker, schema_translate_map
│ ├── notify_listener.py # asyncpg LISTEN/NOTIFY по каналу dl_jobs
│ │
│ ├── models/
│ │ ├── base.py # Declarative base
│ │ └── queue.py # ORM-модели DLJob, DLJobEvent
│ │
│ ├── repositories/
│ │ └── queue.py # QueueRepository (CRUD операции)
│ │
│ └── schemas/
│ └── queue.py # CreateJobRequest, JobStatus
│
├── workers/
│ ├── base.py # PGWorker: главный цикл, heartbeat, вызов пайплайнов
│ ├── manager.py # WorkerManager: запуск/остановка + reaper
│ ├── reaper.py # Requeue_lost на базе репозитория
│ │
│ └── pipelines/
│ ├── __init__.py # Автозагрузка модулей для регистрации
│ ├── registry.py # Реестр обработчиков задач (@register)
│ └── noop.py # Пример эталонного пайплайна
│
└── logger/
└── ... # Логирование
Взаимодействие с БД
- Подключение:
postgresql+asyncpgчерез SQLAlchemy AsyncEngine. - Схемы: логическое имя
queueмаппится на реальную черезschema_translate_map(см.engine.py), имя реальной схемы задаётсяPG_SCHEMA_QUEUE. - DDL: см.
DDL.sql. Ключевые элементы:dl_jobsс индексами на claim и running‑lease,dl_job_eventsкак журнал событий,- триггер
notify_job_ready()+LISTEN dl_jobsдля пробуждения воркеров.
- Конкуренция: claim через
FOR UPDATE SKIP LOCKED, взаимное исключение по бизнес‑сущности через advisory‑lockpg_try_advisory_lock(hashtext(lock_key)). - Надёжность: at‑least‑once. Пайплайны должны быть идемпотентны в части записи в целевые таблицы.
HTTP API (v1)
POST /api/v1/jobs/trigger
Постановка задачи в очередь (идемпотентная операция).
Request:
{
"queue": "load.cbr", // обязательно: имя очереди
"task": "load.cbr.rates", // обязательно: имя задачи для registry
"args": { // опционально: аргументы задачи
"date": "2025-01-10",
"currencies": ["USD", "EUR"]
},
"idempotency_key": "cbr_2025-01-10", // опционально: ключ идемпотентности
"lock_key": "cbr_rates", // обязательно: ключ для advisory lock
"partition_key": "2025-01-10", // опционально: ключ партиционирования
"priority": 100, // опционально: приоритет (меньше = выше)
"available_at": "2025-01-10T00:00:00Z", // опционально: отложенный запуск
"max_attempts": 3, // опционально: макс попыток (def: 5)
"lease_ttl_sec": 300, // опционально: TTL аренды (def: 60)
"producer": "api-client", // опционально: кто поставил
"consumer_group": "cbr-loaders" // опционально: группа потребителей
}
Response 200:
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "queued"
}
Коды ответов:
200 OK- задача создана или уже существует (идемпотентность)400 Bad Request- невалидные данные500 Internal Server Error- ошибка сервера
GET /api/v1/jobs/{job_id}/status
Получение статуса задачи.
Response 200:
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "running", // queued/running/succeeded/failed/canceled
"attempt": 1, // текущая попытка
"started_at": "2025-01-10T12:00:00Z", // время первого запуска
"finished_at": null, // время завершения (если есть)
"heartbeat_at": "2025-01-10T12:01:30Z", // последний heartbeat
"error": null, // текст ошибки (если есть)
"progress": { // прогресс выполнения (custom)
"processed": 500,
"total": 1000
}
}
Коды ответов:
200 OK- статус получен404 Not Found- задача не найдена
POST /api/v1/jobs/{job_id}/cancel
Запрос кооперативной отмены задачи.
Response 200:
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "running",
"attempt": 1,
"started_at": "2025-01-10T12:00:00Z",
"heartbeat_at": "2025-01-10T12:01:30Z"
}
Поведение:
- Устанавливает флаг
cancel_requested = trueв БД - Воркер проверяет флаг между
yieldв пайплайне - При обнаружении флага воркер завершает задачу со статусом
canceled
Коды ответов:
200 OK- запрос отмены принят404 Not Found- задача не найдена
Инфраструктурные эндпоинты
GET /health - проверка работоспособности (без БД, < 20ms)
{"status": "healthy"}
GET /info - информация о сервисе
{
"service": "dataloader",
"version": "1.0.0",
"environment": "production"
}
Воркеры, пайплайны и добавление новых ETL‑задач
Как работает воркер
- Ожидает сигнал (LISTEN/NOTIFY) или таймаут
DL_CLAIM_BACKOFF_SEC. - Пытается забрать одну задачу своей очереди:
status='queued' AND available_at<=now()сFOR UPDATE SKIP LOCKED. - Переводит в
running, увеличиваетattempt, выставляетlease_expires_at, делает heartbeat каждыеDL_HEARTBEAT_SEC. - Захватывает advisory‑lock по
lock_key(если не получилось - возвращает вqueuedс бэкоффом). - Выполняет пайплайн (
task) с поддержкой итеративных шагов и кооперативной отмены. - По завершении:
succeededилиfailed/canceled; при ошибках возможны ретраи доmax_attempts.
Протокол выполнения задачи (SQL)
Claim (захват задачи):
WITH cte AS (
SELECT job_id
FROM dl_jobs
WHERE status = 'queued'
AND queue = :queue
AND available_at <= now()
ORDER BY priority ASC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE dl_jobs j
SET status = 'running',
started_at = COALESCE(started_at, now()),
attempt = attempt + 1,
lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec),
heartbeat_at = now()
FROM cte
WHERE j.job_id = cte.job_id
RETURNING j.job_id, j.task, j.args, j.lock_key, j.lease_ttl_sec;
Heartbeat (продление аренды):
UPDATE dl_jobs
SET heartbeat_at = now(),
lease_expires_at = now() + make_interval(secs => :ttl)
WHERE job_id = :job_id AND status = 'running'
RETURNING cancel_requested;
Завершение успешное:
UPDATE dl_jobs
SET status = 'succeeded',
finished_at = now(),
lease_expires_at = NULL
WHERE job_id = :job_id;
SELECT pg_advisory_unlock(hashtext(:lock_key));
Завершение с ошибкой (retry):
UPDATE dl_jobs
SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END,
available_at = CASE WHEN attempt < max_attempts
THEN now() + make_interval(secs => 30 * attempt)
ELSE now() END,
error = :error_message,
lease_expires_at = NULL,
finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END
WHERE job_id = :job_id;
SELECT pg_advisory_unlock(hashtext(:lock_key));
Reaper (возврат потерянных задач):
UPDATE dl_jobs
SET status = 'queued',
available_at = now(),
lease_expires_at = NULL
WHERE status = 'running'
AND lease_expires_at IS NOT NULL
AND lease_expires_at < now()
RETURNING job_id;
Интерфейс пайплайна
Пайплайн - обычная функция, возвращающая одно из:
- асинхронный генератор шагов (рекомендуется для длинных процессов),
- корутину,
- синхронную функцию.
Каждый «yield» в асинхронном генераторе - безопасная точка, где воркер выполнит heartbeat и проверит cancel_requested.
Регистрация нового пайплайна через декоратор @register("task_name") в модуле src/dataloader/workers/pipelines/<your_task>.py.
Пример:
from __future__ import annotations
import asyncio
from typing import AsyncIterator
from dataloader.workers.pipelines.registry import register
@register("load.customers")
async def load_customers(args: dict) -> AsyncIterator[None]:
# шаг 1 – вытягиваем данные
await some_fetch(args)
yield
# шаг 2 – пишем в БД идемпотентно (upsert/merge)
await upsert_customers(args)
yield
# шаг 3 – пост‑обработка
await finalize(args)
yield
Важно:
- Пайплайны должны быть идемпотентны (повторный запуск не должен ломать данные).
- Долгие операции разбивайте на шаги с
yield, чтобы работал heartbeat и отмена. - Для бизнес‑взаимного исключения выбирайте корректный
lock_key(например,customer:{id}), чтобы параллельные задачи не конфликтовали.
Добавление ETL‑задачи (шаги)
1. Создать пайплайн в src/dataloader/workers/pipelines/:
# src/dataloader/workers/pipelines/load_cbr_rates.py
from __future__ import annotations
from typing import AsyncIterator
from datetime import datetime
from dataloader.workers.pipelines.registry import register
@register("load.cbr.rates")
async def load_cbr_rates(args: dict) -> AsyncIterator[None]:
"""
Загрузка курсов валют ЦБ РФ.
Args:
args: {"date": "YYYY-MM-DD", "currencies": ["USD", "EUR"]}
"""
date = datetime.fromisoformat(args["date"])
currencies = args.get("currencies", ["USD", "EUR"])
# Извлечение данных
data = await fetch_cbr_rates(date, currencies)
yield # Heartbeat checkpoint
# Трансформация
transformed = transform_rates(data)
yield # Heartbeat checkpoint
# Идемпотентная загрузка в БД
async with get_target_session() as session:
await session.execute(
insert(CbrRates)
.values(transformed)
.on_conflict_do_update(
index_elements=["date", "currency"],
set_={"rate": excluded.c.rate, "updated_at": func.now()}
)
)
await session.commit()
yield
2. Настроить воркеры в .env:
WORKERS_JSON='[{"queue":"load.cbr","concurrency":2}]'
3. Поставить задачу через API:
curl -X POST http://localhost:8081/api/v1/jobs/trigger \
-H "Content-Type: application/json" \
-d '{
"queue": "load.cbr",
"task": "load.cbr.rates",
"args": {"date": "2025-01-10", "currencies": ["USD", "EUR"]},
"lock_key": "cbr_rates_2025-01-10",
"partition_key": "2025-01-10",
"priority": 100,
"max_attempts": 3,
"lease_ttl_sec": 300
}'
4. Мониторить выполнение:
# Получить job_id из ответа и проверить статус
curl http://localhost:8081/api/v1/jobs/{job_id}/status
Ключевые моменты:
- Пайплайн должен быть идемпотентным (повторный запуск не должен ломать данные)
- Используйте
yieldпосле каждого значимого чанка работы для heartbeat lock_keyдолжен обеспечивать взаимное исключение (например,customer:{id})partition_keyиспользуется для параллелизации независимых задач
Существующие пайплайны
load.tenera - Загрузка котировок
Описание: Загружает финансовые котировки (биржевые индексы, валюты, сырьевые товары) из SuperTenera API.
Источник данных:
- SuperTenera API - агрегатор котировок из множества источников (CBR, Investing.com, SGX, Bloomberg, TradingView, TradingEconomics)
Назначение:
- Таблицы в схеме
quotes:quote_section- разделы котировок (CBR, SGX, и т.д.)quote- инструменты (тикеры, названия)quote_value- временные ряды значений (OHLCV + дополнительные метрики)
Процесс:
- Запрос данных из SuperTenera API
- Парсинг различных форматов (каждый источник имеет свою структуру)
- Трансформация в единый формат с преобразованием временных зон
- UPSERT в БД (идемпотентная вставка/обновление)
Пример запуска:
# Настроить воркер в .env
WORKERS_JSON='[{"queue":"load.tenera","concurrency":1}]'
# Запустить задачу через API
curl -X POST http://localhost:8081/api/v1/jobs/trigger \
-H "Content-Type: application/json" \
-d '{
"queue": "load.tenera",
"task": "load.tenera",
"args": {},
"lock_key": "tenera_quotes",
"priority": 100,
"max_attempts": 3,
"lease_ttl_sec": 300
}'
Особенности:
- Поддержка множества форматов временных меток (ISO, Unix timestamp, кастомные форматы)
- Автоматическое преобразование временных зон в Europe/Moscow
- Обработка разных структур данных от различных провайдеров
- UPSERT по композитному ключу (quote_id + date/timestamp)
load.opu - Загрузка данных OPU
Описание: Загружает данные OPU из Gmap2Brief API. Данные выгружаются в виде zstandard-сжатого архива с JSON Lines.
Источник данных:
- Gmap2Brief API - экспорт данных о структуре OPU
Назначение:
- Таблица
brief_digital_certificate_opuв схемеopu(23 поля, композитный PK из 8 полей)
Процесс:
- Запуск экспорта через API (
/export/opu/start) - Polling статуса экспорта до завершения
- Скачивание zstandard-архива с JSON Lines
- TRUNCATE целевой таблицы (полная перезагрузка)
- Потоковая распаковка архива (64KB чанки)
- Парсинг JSON Lines и батчевая вставка (по 5000 записей)
- Преобразование ISO-дат в date/datetime объекты
Пример запуска:
# Настроить воркер в .env
WORKERS_JSON='[{"queue":"load.opu","concurrency":1}]'
# Запустить задачу через API
curl -X POST http://localhost:8081/api/v1/jobs/trigger \
-H "Content-Type: application/json" \
-d '{
"queue": "load.opu",
"task": "load.opu",
"args": {},
"lock_key": "opu_export",
"priority": 100,
"max_attempts": 2,
"lease_ttl_sec": 600
}'
Особенности:
- Потоковая обработка больших архивов без полной загрузки в память
- Zstandard декомпрессия с буферизацией неполных строк
- TRUNCATE перед загрузкой (стратегия полной замены данных)
- Батчевая вставка для оптимизации производительности
- Heartbeat после каждого батча для отслеживания прогресса
- Композитный первичный ключ из 8 полей обеспечивает уникальность
noop - Тестовый пайплайн
Описание: Демонстрационный пайплайн для тестирования инфраструктуры очереди. Выполняет серию sleep-операций с heartbeat.
Пример запуска:
curl -X POST http://localhost:8081/api/v1/jobs/trigger \
-H "Content-Type: application/json" \
-d '{
"queue": "etl.default",
"task": "noop",
"args": {"sleep1": 2, "sleep2": 3, "sleep3": 1},
"lock_key": "test_noop",
"priority": 100
}'
Логирование, метрики, аудит
- Логи: структурированные, через
logger/*. Middleware (api/middleware.py) логирует входящие запросы/исходящие ответы, время обработки, пишет метрики и аудит‑события. - Метрики: простые счётчики (пример: likes/dislikes, requests_total, responses_total, duration_ms).
- Аудит: запись бизнес‑событий начала/окончания обработки запроса.
Тестирование
Структура тестов
tests/
├── conftest.py # Глобальные фикстуры (db_engine, db_session, client)
├── integration_tests/ # Интеграционные тесты с реальной БД
│ ├── test_queue_repository.py # 12 тестов репозитория
│ └── test_api_endpoints.py # 7 тестов API endpoints
└── unit/ # Юнит-тесты с моками (92 теста)
├── test_config.py # 30 тестов конфигурации
├── test_context.py # 13 тестов AppContext
├── test_api_service.py # 10 тестов сервисного слоя
├── test_notify_listener.py # 13 тестов LISTEN/NOTIFY
├── test_workers_base.py # 14 тестов PGWorker
├── test_workers_manager.py # 10 тестов WorkerManager
└── test_pipeline_registry.py # 5 тестов реестра пайплайнов
Запуск тестов
# Все тесты (111 тестов)
poetry run pytest
# Только юнит-тесты
poetry run pytest tests/unit/ -m unit
# Только интеграционные
poetry run pytest tests/integration_tests/ -m integration
# С покрытием кода
poetry run pytest --cov=dataloader --cov-report=html
# С подробным выводом
poetry run pytest -v -s
Покрытие кода
Текущее покрытие: 95.50% (778 строк / 743 покрыто)
Name Stmts Miss Cover Missing
--------------------------------------------------------------------------------
src\dataloader\__init__.py 2 0 100.00%
src\dataloader\api\__init__.py 31 10 67.74% 28-38
src\dataloader\api\metric_router.py 14 4 71.43% 23-27, 39-43
src\dataloader\api\os_router.py 11 2 81.82% 31-33
src\dataloader\api\schemas.py 13 0 100.00%
src\dataloader\api\v1\__init__.py 2 0 100.00%
src\dataloader\api\v1\exceptions.py 4 0 100.00%
src\dataloader\api\v1\models.py 0 0 100.00%
src\dataloader\api\v1\router.py 29 0 100.00%
src\dataloader\api\v1\schemas.py 33 1 96.97% 34
src\dataloader\api\v1\service.py 32 0 100.00%
src\dataloader\api\v1\utils.py 4 0 100.00%
src\dataloader\config.py 79 0 100.00%
src\dataloader\context.py 39 0 100.00%
src\dataloader\exceptions.py 0 0 100.00%
src\dataloader\storage\__init__.py 0 0 100.00%
src\dataloader\storage\engine.py 9 1 88.89% 52
src\dataloader\storage\models\__init__.py 4 0 100.00%
src\dataloader\storage\models\base.py 4 0 100.00%
src\dataloader\storage\models\queue.py 43 0 100.00%
src\dataloader\storage\notify_listener.py 49 0 100.00%
src\dataloader\storage\repositories\__init__.py 3 0 100.00%
src\dataloader\storage\repositories\queue.py 130 0 100.00%
src\dataloader\storage\schemas\__init__.py 3 0 100.00%
src\dataloader\storage\schemas\queue.py 29 0 100.00%
src\dataloader\workers\__init__.py 0 0 100.00%
src\dataloader\workers\base.py 102 0 100.00%
src\dataloader\workers\manager.py 64 0 100.00%
src\dataloader\workers\pipelines\__init__.py 11 5 54.55% 15-17, 24-25
src\dataloader\workers\pipelines\noop.py 12 12 0.00% 2-20
src\dataloader\workers\pipelines\registry.py 15 0 100.00%
src\dataloader\workers\reaper.py 7 0 100.00%
--------------------------------------------------------------------------------
TOTAL 778 35 95.50%
Ключевые тест-сценарии
Интеграционные тесты:
- Постановка задачи через API → проверка статуса
- Идемпотентность через
idempotency_key - Claim задачи → heartbeat → успешное завершение
- Claim задачи → ошибка → retry → финальный fail
- Конкуренция воркеров через advisory lock
- Возврат потерянных задач (reaper)
- Отмена задачи пользователем
Юнит-тесты:
- Конфигурация из переменных окружения
- Создание и управление воркерами
- LISTEN/NOTIFY механизм
- Сервисный слой и репозиторий
- Протокол heartbeat и отмены
Масштабирование
- Вертикальное: Увеличение
concurrencyвWORKERS_JSONдля существующих воркеров - Горизонтальное: Увеличение количества реплик (pods). Очередь в БД и advisory-lock обеспечат корректность при конкуренции между репликами
- По очередям: Разные deployment'ы для разных очередей с разными ресурсами
Graceful Shutdown
При получении SIGTERM:
- Останавливает прием новых HTTP запросов
- Сигнализирует воркерам о необходимости завершения
- Ждет завершения текущих задач (timeout 30 сек)
- Останавливает reaper
- Закрывает соединения с БД
Мониторинг
Health Checks:
GET /health- проверка работоспособности (без БД, < 20ms)GET /info- информация о версии
Метрики (если включен metric_router):
- Количество задач по статусам (queued, running, succeeded, failed)
- Время выполнения задач (p50, p95, p99)
- Количество активных воркеров
- Частота ошибок
Логи: Структурированные JSON-логи с уровнями: DEBUG, INFO, WARNING, ERROR, CRITICAL
Ключевые события для алертов:
worker.claim.backoff- частые backoff'ы (возможна конкуренция)worker.complete.failed- высокий процент ошибокreaper.requeued- частый возврат потерянных задач (проблемы с lease)api.error- ошибки API
Troubleshooting
Задачи застревают в статусе queued
Симптомы: Задачи не начинают выполняться, остаются в queued.
Возможные причины:
- Воркеры не запущены или упали
- Нет воркеров для данной очереди в
WORKERS_JSON available_atв будущем
Решение:
# Проверить задачи в БД
SELECT job_id, queue, status, available_at, created_at
FROM dl_jobs
WHERE status = 'queued'
ORDER BY created_at DESC
LIMIT 10;
Задачи часто возвращаются в queued (backoff)
Симптомы: В логах частые события worker.claim.backoff.
Причины:
- Конкуренция за
lock_key: несколько задач с одинаковымlock_keyодновременно - Advisory lock уже занят другим процессом
Решение:
- Проверить корректность выбора
lock_key(должен быть уникальным для бизнес-сущности) - Использовать
partition_keyдля распределения нагрузки - Снизить
concurrencyдля данной очереди
Высокий процент failed задач
Симптомы: Много задач завершаются с status = 'failed'.
Диагностика:
SELECT job_id, task, error, attempt, max_attempts
FROM dl_jobs
WHERE status = 'failed'
ORDER BY finished_at DESC
LIMIT 20;
Возможные причины:
- Ошибки в коде пайплайна
- Недоступность внешних сервисов
- Таймауты (превышение
lease_ttl_sec) - Неверные аргументы в
args
Решение:
- Проверить логи с
job_id - Увеличить
max_attemptsдля retry - Увеличить
lease_ttl_secдля долгих операций - Исправить код пайплайна
Медленное выполнение задач
Симптомы: Задачи выполняются дольше ожидаемого.
Диагностика:
SELECT
task,
AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_duration_sec,
COUNT(*) as total
FROM dl_jobs
WHERE status IN ('succeeded', 'failed')
AND finished_at > NOW() - INTERVAL '1 hour'
GROUP BY task
ORDER BY avg_duration_sec DESC;
Возможные причины:
- Неоптимальный код пайплайна
- Медленные внешние сервисы
- Недостаточно воркеров (
concurrencyслишком мал) - Проблемы с БД (медленные запросы, блокировки)
Решение:
- Профилировать код пайплайна
- Увеличить
concurrencyвWORKERS_JSON - Оптимизировать запросы к БД (индексы, batching)
- Масштабировать горизонтально (больше реплик)
Проблемы с LISTEN/NOTIFY
Симптомы: Воркеры не просыпаются сразу после постановки задачи.
Диагностика:
# Проверить триггеры в БД
SELECT * FROM pg_trigger WHERE tgname LIKE 'dl_jobs%';
Возможные причины:
- Триггеры не созданы или отключены
- Проблемы с подключением asyncpg
- Воркер не подписан на канал
Решение:
- Воркер автоматически fallback'ится на polling при проблемах с LISTEN/NOTIFY
- Проверить DDL: триггеры
dl_jobs_notify_insиdl_jobs_notify_upd - Проверить права пользователя БД на LISTEN/NOTIFY