dataloader/README.md

742 lines
32 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Dataloader
Асинхронный сервис постановки и исполнения ETLзадач поверх очереди в PostgreSQL. Предоставляет HTTP API для триггера задач, мониторинга статуса и отмены. Внутри процесса поднимает пул воркеров, которые конкурируют за задачи через SELECT … FOR UPDATE SKIP LOCKED и обрабатывают их с учётом lease/heartbeat и кооперативной отмены. Для пробуждения воркеров используется LISTEN/NOTIFY.
## Содержание
- О проекте
- Быстрый старт
- Конфигурация окружения
- Архитектура и потоки данных
- Структура проекта
- Взаимодействие с БД
- HTTP API
- Воркеры, пайплайны и добавление новых ETLзадач
- Логирование, метрики, аудит
- Тестирование
- Эксплуатация и масштабирование
## О проекте
Сервис решает типовую задачу фоновой обработки задач: один общий пул воркеров, одна очередь в БД, несколько типов задач (пайплайнов), идемпотентность, повторные попытки, контроль конкуренции через advisorylock по `lock_key`, кооперативная отмена, возврат «потерянных» задач.
## Быстрый старт
1. Установить зависимости (poetry):
```bash
poetry install
```
2. Подготовить PostgreSQL (см. DDL в `DDL.sql`) и переменные окружения (см. «Конфигурация»).
3. Запуск сервиса:
```bash
poetry run python -m dataloader
```
4. Проверка доступности:
```bash
curl http://localhost:8081/health
```
5. Пример постановки задачи:
```bash
curl -X POST http://localhost:8081/api/v1/jobs/trigger \
-H "Content-Type: application/json" \
-d '{
"queue": "etl.default",
"task": "noop",
"args": {"sleep1": 1, "sleep2": 1, "sleep3": 1},
"lock_key": "customer:42",
"priority": 100
}'
```
## Конфигурация окружения
Настройки собираются в `src/dataloader/config.py` через Pydantic Settings.
- Приложение (`AppSettings`):
- `APP_HOST` (def: `0.0.0.0`)
- `APP_PORT` (def: `8081`)
- `TIMEZONE` (def: `Europe/Moscow`)
- Логирование (`LogSettings`):
- `LOG_PATH`, `LOG_FILE_NAME`, `LOG_ROTATION`
- `METRIC_PATH`, `METRIC_FILE_NAME`
- `AUDIT_LOG_PATH`, `AUDIT_LOG_FILE_NAME`
- `DEBUG` переключает уровень на DEBUG
- PostgreSQL (`PGSettings`):
- `PG_HOST`, `PG_PORT`, `PG_USER`, `PG_PASSWORD`, `PG_DATABASE`
- `PG_SCHEMA_QUEUE` - схема таблиц очереди (логическая, маппится через `schema_translate_map`)
- Параметры пула: `PG_USE_POOL`, `PG_POOL_SIZE`, `PG_MAX_OVERFLOW`, `PG_POOL_RECYCLE`
- Таймауты: `PG_CONNECT_TIMEOUT`, `PG_COMMAND_TIMEOUT`
- Воркеры (`WorkerSettings`):
- `WORKERS_JSON` - список конфигураций воркеров, например: `[{"queue":"etl.default","concurrency":2}]`
- `DL_HEARTBEAT_SEC` (def: 10)
- `DL_DEFAULT_LEASE_TTL_SEC` (def: 60)
- `DL_REAPER_PERIOD_SEC` (def: 10)
- `DL_CLAIM_BACKOFF_SEC` (def: 15)
## Архитектура и потоки данных
- HTTP слой: FastAPIприложение (`dataloader.api`) с v1 API и инфраструктурными маршрутами (`/health`, `/info`).
- Контекст: `AppContext` инициализирует логирование, `AsyncEngine`, `async_sessionmaker` и предоставляет DI (`get_session`).
- Очередь: одна таблица `dl_jobs` и журнал `dl_job_events` в PostgreSQL. Идемпотентность на уровне `idempotency_key`. Пробуждение воркеров через триггеры NOTIFY в БД и listener на стороне приложения.
- Воркеры: `WorkerManager` поднимает N асинхронных воркеров (`PGWorker`) на основании `WORKERS_JSON`. Каждый воркер:
1) ждёт уведомление (LISTEN/NOTIFY) или таймаут,
2) пытается «захватить» одну задачу (SELECT … FOR UPDATE SKIP LOCKED),
3) выставляет `running`, получает advisorylock по `lock_key`,
4) исполняет соответствующий пайплайн с heartbeat,
5) завершает задачу: `succeeded`/`failed`/`canceled` или возвращает в очередь на ретрай.
- Реапер: фоновая задача, периодически возвращает «потерянные» runningзадачи в `queued`.
## Структура
```
src/dataloader/
├── __main__.py # Запуск uvicorn, lifecycle через FastAPI lifespan
├── config.py # Pydantic-настройки (app/log/pg/worker)
├── context.py # AppContext: engine, sessionmaker, логгер, DI
├── api/
│ ├── __init__.py # FastAPI app, middleware, routers, lifespan
│ ├── middleware.py # Логирование запросов + метрики/аудит
│ ├── os_router.py # /health, /info (инфраструктурные ручки)
│ ├── metric_router.py # Примеры метрик (like/dislike)
│ │
│ └── v1/
│ ├── router.py # /api/v1/jobs: trigger, status, cancel
│ ├── service.py # Бизнес-логика поверх репозитория
│ ├── schemas.py # Pydantic DTO API
│ └── utils.py # Утилиты (генерация UUID и т.д.)
├── storage/
│ ├── engine.py # AsyncEngine, sessionmaker, schema_translate_map
│ ├── notify_listener.py # asyncpg LISTEN/NOTIFY по каналу dl_jobs
│ │
│ ├── models/
│ │ ├── base.py # Declarative base
│ │ └── queue.py # ORM-модели DLJob, DLJobEvent
│ │
│ ├── repositories/
│ │ └── queue.py # QueueRepository (CRUD операции)
│ │
│ └── schemas/
│ └── queue.py # CreateJobRequest, JobStatus
├── workers/
│ ├── base.py # PGWorker: главный цикл, heartbeat, вызов пайплайнов
│ ├── manager.py # WorkerManager: запуск/остановка + reaper
│ ├── reaper.py # Requeue_lost на базе репозитория
│ │
│ └── pipelines/
│ ├── __init__.py # Автозагрузка модулей для регистрации
│ ├── registry.py # Реестр обработчиков задач (@register)
│ └── noop.py # Пример эталонного пайплайна
└── logger/
└── ... # Логирование
```
## Взаимодействие с БД
- Подключение: `postgresql+asyncpg` через SQLAlchemy AsyncEngine.
- Схемы: логическое имя `queue` маппится на реальную через `schema_translate_map` (см. `engine.py`), имя реальной схемы задаётся `PG_SCHEMA_QUEUE`.
- DDL: см. `DDL.sql`. Ключевые элементы:
- `dl_jobs` с индексами на claim и runninglease,
- `dl_job_events` как журнал событий,
- триггер `notify_job_ready()` + `LISTEN dl_jobs` для пробуждения воркеров.
- Конкуренция: claim через `FOR UPDATE SKIP LOCKED`, взаимное исключение по бизнес‑сущности через advisorylock `pg_try_advisory_lock(hashtext(lock_key))`.
- Надёжность: atleastonce. Пайплайны должны быть идемпотентны в части записи в целевые таблицы.
## HTTP API (v1)
### POST `/api/v1/jobs/trigger`
Постановка задачи в очередь (идемпотентная операция).
**Request:**
```json
{
"queue": "load.cbr", // обязательно: имя очереди
"task": "load.cbr.rates", // обязательно: имя задачи для registry
"args": { // опционально: аргументы задачи
"date": "2025-01-10",
"currencies": ["USD", "EUR"]
},
"idempotency_key": "cbr_2025-01-10", // опционально: ключ идемпотентности
"lock_key": "cbr_rates", // обязательно: ключ для advisory lock
"partition_key": "2025-01-10", // опционально: ключ партиционирования
"priority": 100, // опционально: приоритет (меньше = выше)
"available_at": "2025-01-10T00:00:00Z", // опционально: отложенный запуск
"max_attempts": 3, // опционально: макс попыток (def: 5)
"lease_ttl_sec": 300, // опционально: TTL аренды (def: 60)
"producer": "api-client", // опционально: кто поставил
"consumer_group": "cbr-loaders" // опционально: группа потребителей
}
```
**Response 200:**
```json
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "queued"
}
```
**Коды ответов:**
- `200 OK` - задача создана или уже существует (идемпотентность)
- `400 Bad Request` - невалидные данные
- `500 Internal Server Error` - ошибка сервера
### GET `/api/v1/jobs/{job_id}/status`
Получение статуса задачи.
**Response 200:**
```json
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "running", // queued/running/succeeded/failed/canceled
"attempt": 1, // текущая попытка
"started_at": "2025-01-10T12:00:00Z", // время первого запуска
"finished_at": null, // время завершения (если есть)
"heartbeat_at": "2025-01-10T12:01:30Z", // последний heartbeat
"error": null, // текст ошибки (если есть)
"progress": { // прогресс выполнения (custom)
"processed": 500,
"total": 1000
}
}
```
**Коды ответов:**
- `200 OK` - статус получен
- `404 Not Found` - задача не найдена
### POST `/api/v1/jobs/{job_id}/cancel`
Запрос кооперативной отмены задачи.
**Response 200:**
```json
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "running",
"attempt": 1,
"started_at": "2025-01-10T12:00:00Z",
"heartbeat_at": "2025-01-10T12:01:30Z"
}
```
**Поведение:**
- Устанавливает флаг `cancel_requested = true` в БД
- Воркер проверяет флаг между `yield` в пайплайне
- При обнаружении флага воркер завершает задачу со статусом `canceled`
**Коды ответов:**
- `200 OK` - запрос отмены принят
- `404 Not Found` - задача не найдена
### Инфраструктурные эндпоинты
**GET `/health`** - проверка работоспособности (без БД, < 20ms)
```json
{"status": "healthy"}
```
**GET `/info`** - информация о сервисе
```json
{
"service": "dataloader",
"version": "1.0.0",
"environment": "production"
}
```
## Воркеры, пайплайны и добавление новых ETLзадач
### Как работает воркер
1. Ожидает сигнал (LISTEN/NOTIFY) или таймаут `DL_CLAIM_BACKOFF_SEC`.
2. Пытается забрать одну задачу своей очереди: `status='queued' AND available_at<=now()` с `FOR UPDATE SKIP LOCKED`.
3. Переводит в `running`, увеличивает `attempt`, выставляет `lease_expires_at`, делает heartbeat каждые `DL_HEARTBEAT_SEC`.
4. Захватывает advisorylock по `lock_key` (если не получилось - возвращает в `queued` с бэкоффом).
5. Выполняет пайплайн (`task`) с поддержкой итеративных шагов и кооперативной отмены.
6. По завершении: `succeeded` или `failed`/`canceled`; при ошибках возможны ретраи до `max_attempts`.
### Протокол выполнения задачи (SQL)
**Claim (захват задачи):**
```sql
WITH cte AS (
SELECT job_id
FROM dl_jobs
WHERE status = 'queued'
AND queue = :queue
AND available_at <= now()
ORDER BY priority ASC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE dl_jobs j
SET status = 'running',
started_at = COALESCE(started_at, now()),
attempt = attempt + 1,
lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec),
heartbeat_at = now()
FROM cte
WHERE j.job_id = cte.job_id
RETURNING j.job_id, j.task, j.args, j.lock_key, j.lease_ttl_sec;
```
**Heartbeat (продление аренды):**
```sql
UPDATE dl_jobs
SET heartbeat_at = now(),
lease_expires_at = now() + make_interval(secs => :ttl)
WHERE job_id = :job_id AND status = 'running'
RETURNING cancel_requested;
```
**Завершение успешное:**
```sql
UPDATE dl_jobs
SET status = 'succeeded',
finished_at = now(),
lease_expires_at = NULL
WHERE job_id = :job_id;
SELECT pg_advisory_unlock(hashtext(:lock_key));
```
**Завершение с ошибкой (retry):**
```sql
UPDATE dl_jobs
SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END,
available_at = CASE WHEN attempt < max_attempts
THEN now() + make_interval(secs => 30 * attempt)
ELSE now() END,
error = :error_message,
lease_expires_at = NULL,
finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END
WHERE job_id = :job_id;
SELECT pg_advisory_unlock(hashtext(:lock_key));
```
**Reaper (возврат потерянных задач):**
```sql
UPDATE dl_jobs
SET status = 'queued',
available_at = now(),
lease_expires_at = NULL
WHERE status = 'running'
AND lease_expires_at IS NOT NULL
AND lease_expires_at < now()
RETURNING job_id;
```
### Интерфейс пайплайна
Пайплайн - обычная функция, возвращающая одно из:
- асинхронный генератор шагов (рекомендуется для длинных процессов),
- корутину,
- синхронную функцию.
Каждый «yield» в асинхронном генераторе - безопасная точка, где воркер выполнит heartbeat и проверит `cancel_requested`.
Регистрация нового пайплайна через декоратор `@register("task_name")` в модуле `src/dataloader/workers/pipelines/<your_task>.py`.
Пример:
```python
from __future__ import annotations
import asyncio
from typing import AsyncIterator
from dataloader.workers.pipelines.registry import register
@register("load.customers")
async def load_customers(args: dict) -> AsyncIterator[None]:
# шаг 1 вытягиваем данные
await some_fetch(args)
yield
# шаг 2 пишем в БД идемпотентно (upsert/merge)
await upsert_customers(args)
yield
# шаг 3 пост‑обработка
await finalize(args)
yield
```
Важно:
- Пайплайны должны быть идемпотентны (повторный запуск не должен ломать данные).
- Долгие операции разбивайте на шаги с `yield`, чтобы работал heartbeat и отмена.
- Для бизнесвзаимного исключения выбирайте корректный `lock_key` (например, `customer:{id}`), чтобы параллельные задачи не конфликтовали.
### Добавление ETLзадачи (шаги)
**1. Создать пайплайн** в `src/dataloader/workers/pipelines/`:
```python
# src/dataloader/workers/pipelines/load_cbr_rates.py
from __future__ import annotations
from typing import AsyncIterator
from datetime import datetime
from dataloader.workers.pipelines.registry import register
@register("load.cbr.rates")
async def load_cbr_rates(args: dict) -> AsyncIterator[None]:
"""
Загрузка курсов валют ЦБ РФ.
Args:
args: {"date": "YYYY-MM-DD", "currencies": ["USD", "EUR"]}
"""
date = datetime.fromisoformat(args["date"])
currencies = args.get("currencies", ["USD", "EUR"])
# Извлечение данных
data = await fetch_cbr_rates(date, currencies)
yield # Heartbeat checkpoint
# Трансформация
transformed = transform_rates(data)
yield # Heartbeat checkpoint
# Идемпотентная загрузка в БД
async with get_target_session() as session:
await session.execute(
insert(CbrRates)
.values(transformed)
.on_conflict_do_update(
index_elements=["date", "currency"],
set_={"rate": excluded.c.rate, "updated_at": func.now()}
)
)
await session.commit()
yield
```
**2. Настроить воркеры** в `.env`:
```bash
WORKERS_JSON='[{"queue":"load.cbr","concurrency":2}]'
```
**3. Поставить задачу через API**:
```bash
curl -X POST http://localhost:8081/api/v1/jobs/trigger \
-H "Content-Type: application/json" \
-d '{
"queue": "load.cbr",
"task": "load.cbr.rates",
"args": {"date": "2025-01-10", "currencies": ["USD", "EUR"]},
"lock_key": "cbr_rates_2025-01-10",
"partition_key": "2025-01-10",
"priority": 100,
"max_attempts": 3,
"lease_ttl_sec": 300
}'
```
**4. Мониторить выполнение**:
```bash
# Получить job_id из ответа и проверить статус
curl http://localhost:8081/api/v1/jobs/{job_id}/status
```
**Ключевые моменты**:
- Пайплайн должен быть идемпотентным (повторный запуск не должен ломать данные)
- Используйте `yield` после каждого значимого чанка работы для heartbeat
- `lock_key` должен обеспечивать взаимное исключение (например, `customer:{id}`)
- `partition_key` используется для параллелизации независимых задач
## Логирование, метрики, аудит
- Логи: структурированные, через `logger/*`. Middleware (`api/middleware.py`) логирует входящие запросы/исходящие ответы, время обработки, пишет метрики и аудитсобытия.
- Метрики: простые счётчики (пример: likes/dislikes, requests_total, responses_total, duration_ms).
- Аудит: запись бизнессобытий начала/окончания обработки запроса.
## Тестирование
### Структура тестов
```
tests/
├── conftest.py # Глобальные фикстуры (db_engine, db_session, client)
├── integration_tests/ # Интеграционные тесты с реальной БД
│ ├── test_queue_repository.py # 12 тестов репозитория
│ └── test_api_endpoints.py # 7 тестов API endpoints
└── unit/ # Юнит-тесты с моками (92 теста)
├── test_config.py # 30 тестов конфигурации
├── test_context.py # 13 тестов AppContext
├── test_api_service.py # 10 тестов сервисного слоя
├── test_notify_listener.py # 13 тестов LISTEN/NOTIFY
├── test_workers_base.py # 14 тестов PGWorker
├── test_workers_manager.py # 10 тестов WorkerManager
└── test_pipeline_registry.py # 5 тестов реестра пайплайнов
```
### Запуск тестов
```bash
# Все тесты (111 тестов)
poetry run pytest
# Только юнит-тесты
poetry run pytest tests/unit/ -m unit
# Только интеграционные
poetry run pytest tests/integration_tests/ -m integration
# С покрытием кода
poetry run pytest --cov=dataloader --cov-report=html
# С подробным выводом
poetry run pytest -v -s
```
### Покрытие кода
Текущее покрытие: **95.50%** (778 строк / 743 покрыто)
```
Name Stmts Miss Cover Missing
--------------------------------------------------------------------------------
src\dataloader\__init__.py 2 0 100.00%
src\dataloader\api\__init__.py 31 10 67.74% 28-38
src\dataloader\api\metric_router.py 14 4 71.43% 23-27, 39-43
src\dataloader\api\os_router.py 11 2 81.82% 31-33
src\dataloader\api\schemas.py 13 0 100.00%
src\dataloader\api\v1\__init__.py 2 0 100.00%
src\dataloader\api\v1\exceptions.py 4 0 100.00%
src\dataloader\api\v1\models.py 0 0 100.00%
src\dataloader\api\v1\router.py 29 0 100.00%
src\dataloader\api\v1\schemas.py 33 1 96.97% 34
src\dataloader\api\v1\service.py 32 0 100.00%
src\dataloader\api\v1\utils.py 4 0 100.00%
src\dataloader\config.py 79 0 100.00%
src\dataloader\context.py 39 0 100.00%
src\dataloader\exceptions.py 0 0 100.00%
src\dataloader\storage\__init__.py 0 0 100.00%
src\dataloader\storage\engine.py 9 1 88.89% 52
src\dataloader\storage\models\__init__.py 4 0 100.00%
src\dataloader\storage\models\base.py 4 0 100.00%
src\dataloader\storage\models\queue.py 43 0 100.00%
src\dataloader\storage\notify_listener.py 49 0 100.00%
src\dataloader\storage\repositories\__init__.py 3 0 100.00%
src\dataloader\storage\repositories\queue.py 130 0 100.00%
src\dataloader\storage\schemas\__init__.py 3 0 100.00%
src\dataloader\storage\schemas\queue.py 29 0 100.00%
src\dataloader\workers\__init__.py 0 0 100.00%
src\dataloader\workers\base.py 102 0 100.00%
src\dataloader\workers\manager.py 64 0 100.00%
src\dataloader\workers\pipelines\__init__.py 11 5 54.55% 15-17, 24-25
src\dataloader\workers\pipelines\noop.py 12 12 0.00% 2-20
src\dataloader\workers\pipelines\registry.py 15 0 100.00%
src\dataloader\workers\reaper.py 7 0 100.00%
--------------------------------------------------------------------------------
TOTAL 778 35 95.50%
```
### Ключевые тест-сценарии
**Интеграционные тесты:**
- Постановка задачи через API проверка статуса
- Идемпотентность через `idempotency_key`
- Claim задачи heartbeat успешное завершение
- Claim задачи ошибка retry финальный fail
- Конкуренция воркеров через advisory lock
- Возврат потерянных задач (reaper)
- Отмена задачи пользователем
**Юнит-тесты:**
- Конфигурация из переменных окружения
- Создание и управление воркерами
- LISTEN/NOTIFY механизм
- Сервисный слой и репозиторий
- Протокол heartbeat и отмены
### Масштабирование
- **Вертикальное**: Увеличение `concurrency` в `WORKERS_JSON` для существующих воркеров
- **Горизонтальное**: Увеличение количества реплик (pods). Очередь в БД и advisory-lock обеспечат корректность при конкуренции между репликами
- **По очередям**: Разные deployment'ы для разных очередей с разными ресурсами
### Graceful Shutdown
При получении SIGTERM:
1. Останавливает прием новых HTTP запросов
2. Сигнализирует воркерам о необходимости завершения
3. Ждет завершения текущих задач (timeout 30 сек)
4. Останавливает reaper
5. Закрывает соединения с БД
### Мониторинг
**Health Checks:**
- `GET /health` - проверка работоспособности (без БД, < 20ms)
- `GET /info` - информация о версии
**Метрики (если включен metric_router):**
- Количество задач по статусам (queued, running, succeeded, failed)
- Время выполнения задач (p50, p95, p99)
- Количество активных воркеров
- Частота ошибок
**Логи:**
Структурированные JSON-логи с уровнями: DEBUG, INFO, WARNING, ERROR, CRITICAL
**Ключевые события для алертов:**
- `worker.claim.backoff` - частые backoff'ы (возможна конкуренция)
- `worker.complete.failed` - высокий процент ошибок
- `reaper.requeued` - частый возврат потерянных задач (проблемы с lease)
- `api.error` - ошибки API
## Troubleshooting
### Задачи застревают в статусе `queued`
**Симптомы:** Задачи не начинают выполняться, остаются в `queued`.
**Возможные причины:**
1. Воркеры не запущены или упали
2. Нет воркеров для данной очереди в `WORKERS_JSON`
3. `available_at` в будущем
**Решение:**
```bash
# Проверить логи воркеров
docker logs dataloader | grep worker
# Проверить конфигурацию
echo $WORKERS_JSON
# Проверить задачи в БД
SELECT job_id, queue, status, available_at, created_at
FROM dl_jobs
WHERE status = 'queued'
ORDER BY created_at DESC
LIMIT 10;
```
### Задачи часто возвращаются в `queued` (backoff)
**Симптомы:** В логах частые события `worker.claim.backoff`.
**Причины:**
- Конкуренция за `lock_key`: несколько задач с одинаковым `lock_key` одновременно
- Advisory lock уже занят другим процессом
**Решение:**
- Проверить корректность выбора `lock_key` (должен быть уникальным для бизнес-сущности)
- Использовать `partition_key` для распределения нагрузки
- Снизить `concurrency` для данной очереди
### Высокий процент `failed` задач
**Симптомы:** Много задач завершаются с `status = 'failed'`.
**Диагностика:**
```sql
SELECT job_id, task, error, attempt, max_attempts
FROM dl_jobs
WHERE status = 'failed'
ORDER BY finished_at DESC
LIMIT 20;
```
**Возможные причины:**
- Ошибки в коде пайплайна
- Недоступность внешних сервисов
- Таймауты (превышение `lease_ttl_sec`)
- Неверные аргументы в `args`
**Решение:**
- Проверить логи с `job_id`
- Увеличить `max_attempts` для retry
- Увеличить `lease_ttl_sec` для долгих операций
- Исправить код пайплайна
### Медленное выполнение задач
**Симптомы:** Задачи выполняются дольше ожидаемого.
**Диагностика:**
```sql
SELECT
task,
AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_duration_sec,
COUNT(*) as total
FROM dl_jobs
WHERE status IN ('succeeded', 'failed')
AND finished_at > NOW() - INTERVAL '1 hour'
GROUP BY task
ORDER BY avg_duration_sec DESC;
```
**Возможные причины:**
- Неоптимальный код пайплайна
- Медленные внешние сервисы
- Недостаточно воркеров (`concurrency` слишком мал)
- Проблемы с БД (медленные запросы, блокировки)
**Решение:**
- Профилировать код пайплайна
- Увеличить `concurrency` в `WORKERS_JSON`
- Оптимизировать запросы к БД (индексы, batching)
- Масштабировать горизонтально (больше реплик)
### Утечка памяти
**Симптомы:** Постепенный рост потребления памяти, OOM kills.
**Диагностика:**
```bash
# Мониторинг памяти
kubectl top pods -l app=dataloader --containers
# Проверить логи перед падением
kubectl logs dataloader-xxx --previous
```
**Возможные причины:**
- Накопление объектов в памяти в пайплайне
- Незакрытые соединения/файлы
- Утечки в зависимостях
**Решение:**
- Использовать context managers (`async with`) для ресурсов
- Обрабатывать данные чанками, не загружать всё в память
- Периодически перезапускать воркеры (restart policy)
### Проблемы с LISTEN/NOTIFY
**Симптомы:** Воркеры не просыпаются сразу после постановки задачи.
**Диагностика:**
```bash
# Проверить логи listener
docker logs dataloader | grep "notify_listener\|LISTEN"
# Проверить триггеры в БД
SELECT * FROM pg_trigger WHERE tgname LIKE 'dl_jobs%';
```
**Возможные причины:**
- Триггеры не созданы или отключены
- Проблемы с подключением asyncpg
- Воркер не подписан на канал
**Решение:**
- Воркер автоматически fallback'ится на polling при проблемах с LISTEN/NOTIFY
- Проверить DDL: триггеры `dl_jobs_notify_ins` и `dl_jobs_notify_upd`
- Проверить права пользователя БД на LISTEN/NOTIFY