742 lines
32 KiB
Markdown
742 lines
32 KiB
Markdown
# Dataloader
|
||
|
||
Асинхронный сервис постановки и исполнения ETL‑задач поверх очереди в PostgreSQL. Предоставляет HTTP API для триггера задач, мониторинга статуса и отмены. Внутри процесса поднимает пул воркеров, которые конкурируют за задачи через SELECT … FOR UPDATE SKIP LOCKED и обрабатывают их с учётом lease/heartbeat и кооперативной отмены. Для пробуждения воркеров используется LISTEN/NOTIFY.
|
||
|
||
|
||
## Содержание
|
||
- О проекте
|
||
- Быстрый старт
|
||
- Конфигурация окружения
|
||
- Архитектура и потоки данных
|
||
- Структура проекта
|
||
- Взаимодействие с БД
|
||
- HTTP API
|
||
- Воркеры, пайплайны и добавление новых ETL‑задач
|
||
- Логирование, метрики, аудит
|
||
- Тестирование
|
||
- Эксплуатация и масштабирование
|
||
|
||
|
||
## О проекте
|
||
Сервис решает типовую задачу фоновой обработки задач: один общий пул воркеров, одна очередь в БД, несколько типов задач (пайплайнов), идемпотентность, повторные попытки, контроль конкуренции через advisory‑lock по `lock_key`, кооперативная отмена, возврат «потерянных» задач.
|
||
|
||
|
||
## Быстрый старт
|
||
1. Установить зависимости (poetry):
|
||
```bash
|
||
poetry install
|
||
```
|
||
2. Подготовить PostgreSQL (см. DDL в `DDL.sql`) и переменные окружения (см. «Конфигурация»).
|
||
3. Запуск сервиса:
|
||
```bash
|
||
poetry run python -m dataloader
|
||
```
|
||
4. Проверка доступности:
|
||
```bash
|
||
curl http://localhost:8081/health
|
||
```
|
||
5. Пример постановки задачи:
|
||
```bash
|
||
curl -X POST http://localhost:8081/api/v1/jobs/trigger \
|
||
-H "Content-Type: application/json" \
|
||
-d '{
|
||
"queue": "etl.default",
|
||
"task": "noop",
|
||
"args": {"sleep1": 1, "sleep2": 1, "sleep3": 1},
|
||
"lock_key": "customer:42",
|
||
"priority": 100
|
||
}'
|
||
```
|
||
|
||
|
||
## Конфигурация окружения
|
||
Настройки собираются в `src/dataloader/config.py` через Pydantic Settings.
|
||
|
||
- Приложение (`AppSettings`):
|
||
- `APP_HOST` (def: `0.0.0.0`)
|
||
- `APP_PORT` (def: `8081`)
|
||
- `TIMEZONE` (def: `Europe/Moscow`)
|
||
|
||
- Логирование (`LogSettings`):
|
||
- `LOG_PATH`, `LOG_FILE_NAME`, `LOG_ROTATION`
|
||
- `METRIC_PATH`, `METRIC_FILE_NAME`
|
||
- `AUDIT_LOG_PATH`, `AUDIT_LOG_FILE_NAME`
|
||
- `DEBUG` переключает уровень на DEBUG
|
||
|
||
- PostgreSQL (`PGSettings`):
|
||
- `PG_HOST`, `PG_PORT`, `PG_USER`, `PG_PASSWORD`, `PG_DATABASE`
|
||
- `PG_SCHEMA_QUEUE` - схема таблиц очереди (логическая, маппится через `schema_translate_map`)
|
||
- Параметры пула: `PG_USE_POOL`, `PG_POOL_SIZE`, `PG_MAX_OVERFLOW`, `PG_POOL_RECYCLE`
|
||
- Таймауты: `PG_CONNECT_TIMEOUT`, `PG_COMMAND_TIMEOUT`
|
||
|
||
- Воркеры (`WorkerSettings`):
|
||
- `WORKERS_JSON` - список конфигураций воркеров, например: `[{"queue":"etl.default","concurrency":2}]`
|
||
- `DL_HEARTBEAT_SEC` (def: 10)
|
||
- `DL_DEFAULT_LEASE_TTL_SEC` (def: 60)
|
||
- `DL_REAPER_PERIOD_SEC` (def: 10)
|
||
- `DL_CLAIM_BACKOFF_SEC` (def: 15)
|
||
|
||
|
||
## Архитектура и потоки данных
|
||
|
||
- HTTP слой: FastAPI‑приложение (`dataloader.api`) с v1 API и инфраструктурными маршрутами (`/health`, `/info`).
|
||
- Контекст: `AppContext` инициализирует логирование, `AsyncEngine`, `async_sessionmaker` и предоставляет DI (`get_session`).
|
||
- Очередь: одна таблица `dl_jobs` и журнал `dl_job_events` в PostgreSQL. Идемпотентность на уровне `idempotency_key`. Пробуждение воркеров через триггеры NOTIFY в БД и listener на стороне приложения.
|
||
- Воркеры: `WorkerManager` поднимает N асинхронных воркеров (`PGWorker`) на основании `WORKERS_JSON`. Каждый воркер:
|
||
1) ждёт уведомление (LISTEN/NOTIFY) или таймаут,
|
||
2) пытается «захватить» одну задачу (SELECT … FOR UPDATE SKIP LOCKED),
|
||
3) выставляет `running`, получает advisory‑lock по `lock_key`,
|
||
4) исполняет соответствующий пайплайн с heartbeat,
|
||
5) завершает задачу: `succeeded`/`failed`/`canceled` или возвращает в очередь на ретрай.
|
||
- Реапер: фоновая задача, периодически возвращает «потерянные» running‑задачи в `queued`.
|
||
|
||
|
||
## Структура
|
||
```
|
||
src/dataloader/
|
||
├── __main__.py # Запуск uvicorn, lifecycle через FastAPI lifespan
|
||
├── config.py # Pydantic-настройки (app/log/pg/worker)
|
||
├── context.py # AppContext: engine, sessionmaker, логгер, DI
|
||
│
|
||
├── api/
|
||
│ ├── __init__.py # FastAPI app, middleware, routers, lifespan
|
||
│ ├── middleware.py # Логирование запросов + метрики/аудит
|
||
│ ├── os_router.py # /health, /info (инфраструктурные ручки)
|
||
│ ├── metric_router.py # Примеры метрик (like/dislike)
|
||
│ │
|
||
│ └── v1/
|
||
│ ├── router.py # /api/v1/jobs: trigger, status, cancel
|
||
│ ├── service.py # Бизнес-логика поверх репозитория
|
||
│ ├── schemas.py # Pydantic DTO API
|
||
│ └── utils.py # Утилиты (генерация UUID и т.д.)
|
||
│
|
||
├── storage/
|
||
│ ├── engine.py # AsyncEngine, sessionmaker, schema_translate_map
|
||
│ ├── notify_listener.py # asyncpg LISTEN/NOTIFY по каналу dl_jobs
|
||
│ │
|
||
│ ├── models/
|
||
│ │ ├── base.py # Declarative base
|
||
│ │ └── queue.py # ORM-модели DLJob, DLJobEvent
|
||
│ │
|
||
│ ├── repositories/
|
||
│ │ └── queue.py # QueueRepository (CRUD операции)
|
||
│ │
|
||
│ └── schemas/
|
||
│ └── queue.py # CreateJobRequest, JobStatus
|
||
│
|
||
├── workers/
|
||
│ ├── base.py # PGWorker: главный цикл, heartbeat, вызов пайплайнов
|
||
│ ├── manager.py # WorkerManager: запуск/остановка + reaper
|
||
│ ├── reaper.py # Requeue_lost на базе репозитория
|
||
│ │
|
||
│ └── pipelines/
|
||
│ ├── __init__.py # Автозагрузка модулей для регистрации
|
||
│ ├── registry.py # Реестр обработчиков задач (@register)
|
||
│ └── noop.py # Пример эталонного пайплайна
|
||
│
|
||
└── logger/
|
||
└── ... # Логирование
|
||
```
|
||
|
||
|
||
## Взаимодействие с БД
|
||
|
||
- Подключение: `postgresql+asyncpg` через SQLAlchemy AsyncEngine.
|
||
- Схемы: логическое имя `queue` маппится на реальную через `schema_translate_map` (см. `engine.py`), имя реальной схемы задаётся `PG_SCHEMA_QUEUE`.
|
||
- DDL: см. `DDL.sql`. Ключевые элементы:
|
||
- `dl_jobs` с индексами на claim и running‑lease,
|
||
- `dl_job_events` как журнал событий,
|
||
- триггер `notify_job_ready()` + `LISTEN dl_jobs` для пробуждения воркеров.
|
||
- Конкуренция: claim через `FOR UPDATE SKIP LOCKED`, взаимное исключение по бизнес‑сущности через advisory‑lock `pg_try_advisory_lock(hashtext(lock_key))`.
|
||
- Надёжность: at‑least‑once. Пайплайны должны быть идемпотентны в части записи в целевые таблицы.
|
||
|
||
|
||
## HTTP API (v1)
|
||
|
||
### POST `/api/v1/jobs/trigger`
|
||
|
||
Постановка задачи в очередь (идемпотентная операция).
|
||
|
||
**Request:**
|
||
```json
|
||
{
|
||
"queue": "load.cbr", // обязательно: имя очереди
|
||
"task": "load.cbr.rates", // обязательно: имя задачи для registry
|
||
"args": { // опционально: аргументы задачи
|
||
"date": "2025-01-10",
|
||
"currencies": ["USD", "EUR"]
|
||
},
|
||
"idempotency_key": "cbr_2025-01-10", // опционально: ключ идемпотентности
|
||
"lock_key": "cbr_rates", // обязательно: ключ для advisory lock
|
||
"partition_key": "2025-01-10", // опционально: ключ партиционирования
|
||
"priority": 100, // опционально: приоритет (меньше = выше)
|
||
"available_at": "2025-01-10T00:00:00Z", // опционально: отложенный запуск
|
||
"max_attempts": 3, // опционально: макс попыток (def: 5)
|
||
"lease_ttl_sec": 300, // опционально: TTL аренды (def: 60)
|
||
"producer": "api-client", // опционально: кто поставил
|
||
"consumer_group": "cbr-loaders" // опционально: группа потребителей
|
||
}
|
||
```
|
||
|
||
**Response 200:**
|
||
```json
|
||
{
|
||
"job_id": "550e8400-e29b-41d4-a716-446655440000",
|
||
"status": "queued"
|
||
}
|
||
```
|
||
|
||
**Коды ответов:**
|
||
- `200 OK` - задача создана или уже существует (идемпотентность)
|
||
- `400 Bad Request` - невалидные данные
|
||
- `500 Internal Server Error` - ошибка сервера
|
||
|
||
### GET `/api/v1/jobs/{job_id}/status`
|
||
|
||
Получение статуса задачи.
|
||
|
||
**Response 200:**
|
||
```json
|
||
{
|
||
"job_id": "550e8400-e29b-41d4-a716-446655440000",
|
||
"status": "running", // queued/running/succeeded/failed/canceled
|
||
"attempt": 1, // текущая попытка
|
||
"started_at": "2025-01-10T12:00:00Z", // время первого запуска
|
||
"finished_at": null, // время завершения (если есть)
|
||
"heartbeat_at": "2025-01-10T12:01:30Z", // последний heartbeat
|
||
"error": null, // текст ошибки (если есть)
|
||
"progress": { // прогресс выполнения (custom)
|
||
"processed": 500,
|
||
"total": 1000
|
||
}
|
||
}
|
||
```
|
||
|
||
**Коды ответов:**
|
||
- `200 OK` - статус получен
|
||
- `404 Not Found` - задача не найдена
|
||
|
||
### POST `/api/v1/jobs/{job_id}/cancel`
|
||
|
||
Запрос кооперативной отмены задачи.
|
||
|
||
**Response 200:**
|
||
```json
|
||
{
|
||
"job_id": "550e8400-e29b-41d4-a716-446655440000",
|
||
"status": "running",
|
||
"attempt": 1,
|
||
"started_at": "2025-01-10T12:00:00Z",
|
||
"heartbeat_at": "2025-01-10T12:01:30Z"
|
||
}
|
||
```
|
||
|
||
**Поведение:**
|
||
- Устанавливает флаг `cancel_requested = true` в БД
|
||
- Воркер проверяет флаг между `yield` в пайплайне
|
||
- При обнаружении флага воркер завершает задачу со статусом `canceled`
|
||
|
||
**Коды ответов:**
|
||
- `200 OK` - запрос отмены принят
|
||
- `404 Not Found` - задача не найдена
|
||
|
||
### Инфраструктурные эндпоинты
|
||
|
||
**GET `/health`** - проверка работоспособности (без БД, < 20ms)
|
||
```json
|
||
{"status": "healthy"}
|
||
```
|
||
|
||
**GET `/info`** - информация о сервисе
|
||
```json
|
||
{
|
||
"service": "dataloader",
|
||
"version": "1.0.0",
|
||
"environment": "production"
|
||
}
|
||
```
|
||
|
||
|
||
## Воркеры, пайплайны и добавление новых ETL‑задач
|
||
|
||
### Как работает воркер
|
||
1. Ожидает сигнал (LISTEN/NOTIFY) или таймаут `DL_CLAIM_BACKOFF_SEC`.
|
||
2. Пытается забрать одну задачу своей очереди: `status='queued' AND available_at<=now()` с `FOR UPDATE SKIP LOCKED`.
|
||
3. Переводит в `running`, увеличивает `attempt`, выставляет `lease_expires_at`, делает heartbeat каждые `DL_HEARTBEAT_SEC`.
|
||
4. Захватывает advisory‑lock по `lock_key` (если не получилось - возвращает в `queued` с бэкоффом).
|
||
5. Выполняет пайплайн (`task`) с поддержкой итеративных шагов и кооперативной отмены.
|
||
6. По завершении: `succeeded` или `failed`/`canceled`; при ошибках возможны ретраи до `max_attempts`.
|
||
|
||
### Протокол выполнения задачи (SQL)
|
||
|
||
**Claim (захват задачи):**
|
||
```sql
|
||
WITH cte AS (
|
||
SELECT job_id
|
||
FROM dl_jobs
|
||
WHERE status = 'queued'
|
||
AND queue = :queue
|
||
AND available_at <= now()
|
||
ORDER BY priority ASC, created_at ASC
|
||
FOR UPDATE SKIP LOCKED
|
||
LIMIT 1
|
||
)
|
||
UPDATE dl_jobs j
|
||
SET status = 'running',
|
||
started_at = COALESCE(started_at, now()),
|
||
attempt = attempt + 1,
|
||
lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec),
|
||
heartbeat_at = now()
|
||
FROM cte
|
||
WHERE j.job_id = cte.job_id
|
||
RETURNING j.job_id, j.task, j.args, j.lock_key, j.lease_ttl_sec;
|
||
```
|
||
|
||
**Heartbeat (продление аренды):**
|
||
```sql
|
||
UPDATE dl_jobs
|
||
SET heartbeat_at = now(),
|
||
lease_expires_at = now() + make_interval(secs => :ttl)
|
||
WHERE job_id = :job_id AND status = 'running'
|
||
RETURNING cancel_requested;
|
||
```
|
||
|
||
**Завершение успешное:**
|
||
```sql
|
||
UPDATE dl_jobs
|
||
SET status = 'succeeded',
|
||
finished_at = now(),
|
||
lease_expires_at = NULL
|
||
WHERE job_id = :job_id;
|
||
|
||
SELECT pg_advisory_unlock(hashtext(:lock_key));
|
||
```
|
||
|
||
**Завершение с ошибкой (retry):**
|
||
```sql
|
||
UPDATE dl_jobs
|
||
SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END,
|
||
available_at = CASE WHEN attempt < max_attempts
|
||
THEN now() + make_interval(secs => 30 * attempt)
|
||
ELSE now() END,
|
||
error = :error_message,
|
||
lease_expires_at = NULL,
|
||
finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END
|
||
WHERE job_id = :job_id;
|
||
|
||
SELECT pg_advisory_unlock(hashtext(:lock_key));
|
||
```
|
||
|
||
**Reaper (возврат потерянных задач):**
|
||
```sql
|
||
UPDATE dl_jobs
|
||
SET status = 'queued',
|
||
available_at = now(),
|
||
lease_expires_at = NULL
|
||
WHERE status = 'running'
|
||
AND lease_expires_at IS NOT NULL
|
||
AND lease_expires_at < now()
|
||
RETURNING job_id;
|
||
```
|
||
|
||
### Интерфейс пайплайна
|
||
Пайплайн - обычная функция, возвращающая одно из:
|
||
- асинхронный генератор шагов (рекомендуется для длинных процессов),
|
||
- корутину,
|
||
- синхронную функцию.
|
||
|
||
Каждый «yield» в асинхронном генераторе - безопасная точка, где воркер выполнит heartbeat и проверит `cancel_requested`.
|
||
|
||
Регистрация нового пайплайна через декоратор `@register("task_name")` в модуле `src/dataloader/workers/pipelines/<your_task>.py`.
|
||
|
||
Пример:
|
||
```python
|
||
from __future__ import annotations
|
||
import asyncio
|
||
from typing import AsyncIterator
|
||
from dataloader.workers.pipelines.registry import register
|
||
|
||
@register("load.customers")
|
||
async def load_customers(args: dict) -> AsyncIterator[None]:
|
||
# шаг 1 – вытягиваем данные
|
||
await some_fetch(args)
|
||
yield
|
||
# шаг 2 – пишем в БД идемпотентно (upsert/merge)
|
||
await upsert_customers(args)
|
||
yield
|
||
# шаг 3 – пост‑обработка
|
||
await finalize(args)
|
||
yield
|
||
```
|
||
|
||
Важно:
|
||
- Пайплайны должны быть идемпотентны (повторный запуск не должен ломать данные).
|
||
- Долгие операции разбивайте на шаги с `yield`, чтобы работал heartbeat и отмена.
|
||
- Для бизнес‑взаимного исключения выбирайте корректный `lock_key` (например, `customer:{id}`), чтобы параллельные задачи не конфликтовали.
|
||
|
||
### Добавление ETL‑задачи (шаги)
|
||
|
||
**1. Создать пайплайн** в `src/dataloader/workers/pipelines/`:
|
||
|
||
```python
|
||
# src/dataloader/workers/pipelines/load_cbr_rates.py
|
||
from __future__ import annotations
|
||
from typing import AsyncIterator
|
||
from datetime import datetime
|
||
from dataloader.workers.pipelines.registry import register
|
||
|
||
@register("load.cbr.rates")
|
||
async def load_cbr_rates(args: dict) -> AsyncIterator[None]:
|
||
"""
|
||
Загрузка курсов валют ЦБ РФ.
|
||
|
||
Args:
|
||
args: {"date": "YYYY-MM-DD", "currencies": ["USD", "EUR"]}
|
||
"""
|
||
date = datetime.fromisoformat(args["date"])
|
||
currencies = args.get("currencies", ["USD", "EUR"])
|
||
|
||
# Извлечение данных
|
||
data = await fetch_cbr_rates(date, currencies)
|
||
yield # Heartbeat checkpoint
|
||
|
||
# Трансформация
|
||
transformed = transform_rates(data)
|
||
yield # Heartbeat checkpoint
|
||
|
||
# Идемпотентная загрузка в БД
|
||
async with get_target_session() as session:
|
||
await session.execute(
|
||
insert(CbrRates)
|
||
.values(transformed)
|
||
.on_conflict_do_update(
|
||
index_elements=["date", "currency"],
|
||
set_={"rate": excluded.c.rate, "updated_at": func.now()}
|
||
)
|
||
)
|
||
await session.commit()
|
||
yield
|
||
```
|
||
|
||
**2. Настроить воркеры** в `.env`:
|
||
|
||
```bash
|
||
WORKERS_JSON='[{"queue":"load.cbr","concurrency":2}]'
|
||
```
|
||
|
||
**3. Поставить задачу через API**:
|
||
|
||
```bash
|
||
curl -X POST http://localhost:8081/api/v1/jobs/trigger \
|
||
-H "Content-Type: application/json" \
|
||
-d '{
|
||
"queue": "load.cbr",
|
||
"task": "load.cbr.rates",
|
||
"args": {"date": "2025-01-10", "currencies": ["USD", "EUR"]},
|
||
"lock_key": "cbr_rates_2025-01-10",
|
||
"partition_key": "2025-01-10",
|
||
"priority": 100,
|
||
"max_attempts": 3,
|
||
"lease_ttl_sec": 300
|
||
}'
|
||
```
|
||
|
||
**4. Мониторить выполнение**:
|
||
|
||
```bash
|
||
# Получить job_id из ответа и проверить статус
|
||
curl http://localhost:8081/api/v1/jobs/{job_id}/status
|
||
```
|
||
|
||
**Ключевые моменты**:
|
||
- Пайплайн должен быть идемпотентным (повторный запуск не должен ломать данные)
|
||
- Используйте `yield` после каждого значимого чанка работы для heartbeat
|
||
- `lock_key` должен обеспечивать взаимное исключение (например, `customer:{id}`)
|
||
- `partition_key` используется для параллелизации независимых задач
|
||
|
||
|
||
## Логирование, метрики, аудит
|
||
- Логи: структурированные, через `logger/*`. Middleware (`api/middleware.py`) логирует входящие запросы/исходящие ответы, время обработки, пишет метрики и аудит‑события.
|
||
- Метрики: простые счётчики (пример: likes/dislikes, requests_total, responses_total, duration_ms).
|
||
- Аудит: запись бизнес‑событий начала/окончания обработки запроса.
|
||
|
||
|
||
## Тестирование
|
||
|
||
### Структура тестов
|
||
|
||
```
|
||
tests/
|
||
├── conftest.py # Глобальные фикстуры (db_engine, db_session, client)
|
||
├── integration_tests/ # Интеграционные тесты с реальной БД
|
||
│ ├── test_queue_repository.py # 12 тестов репозитория
|
||
│ └── test_api_endpoints.py # 7 тестов API endpoints
|
||
└── unit/ # Юнит-тесты с моками (92 теста)
|
||
├── test_config.py # 30 тестов конфигурации
|
||
├── test_context.py # 13 тестов AppContext
|
||
├── test_api_service.py # 10 тестов сервисного слоя
|
||
├── test_notify_listener.py # 13 тестов LISTEN/NOTIFY
|
||
├── test_workers_base.py # 14 тестов PGWorker
|
||
├── test_workers_manager.py # 10 тестов WorkerManager
|
||
└── test_pipeline_registry.py # 5 тестов реестра пайплайнов
|
||
```
|
||
|
||
### Запуск тестов
|
||
|
||
```bash
|
||
# Все тесты (111 тестов)
|
||
poetry run pytest
|
||
|
||
# Только юнит-тесты
|
||
poetry run pytest tests/unit/ -m unit
|
||
|
||
# Только интеграционные
|
||
poetry run pytest tests/integration_tests/ -m integration
|
||
|
||
# С покрытием кода
|
||
poetry run pytest --cov=dataloader --cov-report=html
|
||
|
||
# С подробным выводом
|
||
poetry run pytest -v -s
|
||
```
|
||
|
||
### Покрытие кода
|
||
|
||
Текущее покрытие: **95.50%** (778 строк / 743 покрыто)
|
||
|
||
```
|
||
Name Stmts Miss Cover Missing
|
||
--------------------------------------------------------------------------------
|
||
src\dataloader\__init__.py 2 0 100.00%
|
||
src\dataloader\api\__init__.py 31 10 67.74% 28-38
|
||
src\dataloader\api\metric_router.py 14 4 71.43% 23-27, 39-43
|
||
src\dataloader\api\os_router.py 11 2 81.82% 31-33
|
||
src\dataloader\api\schemas.py 13 0 100.00%
|
||
src\dataloader\api\v1\__init__.py 2 0 100.00%
|
||
src\dataloader\api\v1\exceptions.py 4 0 100.00%
|
||
src\dataloader\api\v1\models.py 0 0 100.00%
|
||
src\dataloader\api\v1\router.py 29 0 100.00%
|
||
src\dataloader\api\v1\schemas.py 33 1 96.97% 34
|
||
src\dataloader\api\v1\service.py 32 0 100.00%
|
||
src\dataloader\api\v1\utils.py 4 0 100.00%
|
||
src\dataloader\config.py 79 0 100.00%
|
||
src\dataloader\context.py 39 0 100.00%
|
||
src\dataloader\exceptions.py 0 0 100.00%
|
||
src\dataloader\storage\__init__.py 0 0 100.00%
|
||
src\dataloader\storage\engine.py 9 1 88.89% 52
|
||
src\dataloader\storage\models\__init__.py 4 0 100.00%
|
||
src\dataloader\storage\models\base.py 4 0 100.00%
|
||
src\dataloader\storage\models\queue.py 43 0 100.00%
|
||
src\dataloader\storage\notify_listener.py 49 0 100.00%
|
||
src\dataloader\storage\repositories\__init__.py 3 0 100.00%
|
||
src\dataloader\storage\repositories\queue.py 130 0 100.00%
|
||
src\dataloader\storage\schemas\__init__.py 3 0 100.00%
|
||
src\dataloader\storage\schemas\queue.py 29 0 100.00%
|
||
src\dataloader\workers\__init__.py 0 0 100.00%
|
||
src\dataloader\workers\base.py 102 0 100.00%
|
||
src\dataloader\workers\manager.py 64 0 100.00%
|
||
src\dataloader\workers\pipelines\__init__.py 11 5 54.55% 15-17, 24-25
|
||
src\dataloader\workers\pipelines\noop.py 12 12 0.00% 2-20
|
||
src\dataloader\workers\pipelines\registry.py 15 0 100.00%
|
||
src\dataloader\workers\reaper.py 7 0 100.00%
|
||
--------------------------------------------------------------------------------
|
||
TOTAL 778 35 95.50%
|
||
```
|
||
|
||
### Ключевые тест-сценарии
|
||
|
||
**Интеграционные тесты:**
|
||
- Постановка задачи через API → проверка статуса
|
||
- Идемпотентность через `idempotency_key`
|
||
- Claim задачи → heartbeat → успешное завершение
|
||
- Claim задачи → ошибка → retry → финальный fail
|
||
- Конкуренция воркеров через advisory lock
|
||
- Возврат потерянных задач (reaper)
|
||
- Отмена задачи пользователем
|
||
|
||
**Юнит-тесты:**
|
||
- Конфигурация из переменных окружения
|
||
- Создание и управление воркерами
|
||
- LISTEN/NOTIFY механизм
|
||
- Сервисный слой и репозиторий
|
||
- Протокол heartbeat и отмены
|
||
|
||
|
||
### Масштабирование
|
||
|
||
- **Вертикальное**: Увеличение `concurrency` в `WORKERS_JSON` для существующих воркеров
|
||
- **Горизонтальное**: Увеличение количества реплик (pods). Очередь в БД и advisory-lock обеспечат корректность при конкуренции между репликами
|
||
- **По очередям**: Разные deployment'ы для разных очередей с разными ресурсами
|
||
|
||
### Graceful Shutdown
|
||
|
||
При получении SIGTERM:
|
||
1. Останавливает прием новых HTTP запросов
|
||
2. Сигнализирует воркерам о необходимости завершения
|
||
3. Ждет завершения текущих задач (timeout 30 сек)
|
||
4. Останавливает reaper
|
||
5. Закрывает соединения с БД
|
||
|
||
### Мониторинг
|
||
|
||
**Health Checks:**
|
||
- `GET /health` - проверка работоспособности (без БД, < 20ms)
|
||
- `GET /info` - информация о версии
|
||
|
||
**Метрики (если включен metric_router):**
|
||
- Количество задач по статусам (queued, running, succeeded, failed)
|
||
- Время выполнения задач (p50, p95, p99)
|
||
- Количество активных воркеров
|
||
- Частота ошибок
|
||
|
||
**Логи:**
|
||
Структурированные JSON-логи с уровнями: DEBUG, INFO, WARNING, ERROR, CRITICAL
|
||
|
||
**Ключевые события для алертов:**
|
||
- `worker.claim.backoff` - частые backoff'ы (возможна конкуренция)
|
||
- `worker.complete.failed` - высокий процент ошибок
|
||
- `reaper.requeued` - частый возврат потерянных задач (проблемы с lease)
|
||
- `api.error` - ошибки API
|
||
|
||
|
||
## Troubleshooting
|
||
|
||
### Задачи застревают в статусе `queued`
|
||
|
||
**Симптомы:** Задачи не начинают выполняться, остаются в `queued`.
|
||
|
||
**Возможные причины:**
|
||
1. Воркеры не запущены или упали
|
||
2. Нет воркеров для данной очереди в `WORKERS_JSON`
|
||
3. `available_at` в будущем
|
||
|
||
**Решение:**
|
||
```bash
|
||
# Проверить логи воркеров
|
||
docker logs dataloader | grep worker
|
||
|
||
# Проверить конфигурацию
|
||
echo $WORKERS_JSON
|
||
|
||
# Проверить задачи в БД
|
||
SELECT job_id, queue, status, available_at, created_at
|
||
FROM dl_jobs
|
||
WHERE status = 'queued'
|
||
ORDER BY created_at DESC
|
||
LIMIT 10;
|
||
```
|
||
|
||
### Задачи часто возвращаются в `queued` (backoff)
|
||
|
||
**Симптомы:** В логах частые события `worker.claim.backoff`.
|
||
|
||
**Причины:**
|
||
- Конкуренция за `lock_key`: несколько задач с одинаковым `lock_key` одновременно
|
||
- Advisory lock уже занят другим процессом
|
||
|
||
**Решение:**
|
||
- Проверить корректность выбора `lock_key` (должен быть уникальным для бизнес-сущности)
|
||
- Использовать `partition_key` для распределения нагрузки
|
||
- Снизить `concurrency` для данной очереди
|
||
|
||
### Высокий процент `failed` задач
|
||
|
||
**Симптомы:** Много задач завершаются с `status = 'failed'`.
|
||
|
||
**Диагностика:**
|
||
```sql
|
||
SELECT job_id, task, error, attempt, max_attempts
|
||
FROM dl_jobs
|
||
WHERE status = 'failed'
|
||
ORDER BY finished_at DESC
|
||
LIMIT 20;
|
||
```
|
||
|
||
**Возможные причины:**
|
||
- Ошибки в коде пайплайна
|
||
- Недоступность внешних сервисов
|
||
- Таймауты (превышение `lease_ttl_sec`)
|
||
- Неверные аргументы в `args`
|
||
|
||
**Решение:**
|
||
- Проверить логи с `job_id`
|
||
- Увеличить `max_attempts` для retry
|
||
- Увеличить `lease_ttl_sec` для долгих операций
|
||
- Исправить код пайплайна
|
||
|
||
|
||
### Медленное выполнение задач
|
||
|
||
**Симптомы:** Задачи выполняются дольше ожидаемого.
|
||
|
||
**Диагностика:**
|
||
```sql
|
||
SELECT
|
||
task,
|
||
AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_duration_sec,
|
||
COUNT(*) as total
|
||
FROM dl_jobs
|
||
WHERE status IN ('succeeded', 'failed')
|
||
AND finished_at > NOW() - INTERVAL '1 hour'
|
||
GROUP BY task
|
||
ORDER BY avg_duration_sec DESC;
|
||
```
|
||
|
||
**Возможные причины:**
|
||
- Неоптимальный код пайплайна
|
||
- Медленные внешние сервисы
|
||
- Недостаточно воркеров (`concurrency` слишком мал)
|
||
- Проблемы с БД (медленные запросы, блокировки)
|
||
|
||
**Решение:**
|
||
- Профилировать код пайплайна
|
||
- Увеличить `concurrency` в `WORKERS_JSON`
|
||
- Оптимизировать запросы к БД (индексы, batching)
|
||
- Масштабировать горизонтально (больше реплик)
|
||
|
||
### Утечка памяти
|
||
|
||
**Симптомы:** Постепенный рост потребления памяти, OOM kills.
|
||
|
||
**Диагностика:**
|
||
```bash
|
||
# Мониторинг памяти
|
||
kubectl top pods -l app=dataloader --containers
|
||
|
||
# Проверить логи перед падением
|
||
kubectl logs dataloader-xxx --previous
|
||
```
|
||
|
||
**Возможные причины:**
|
||
- Накопление объектов в памяти в пайплайне
|
||
- Незакрытые соединения/файлы
|
||
- Утечки в зависимостях
|
||
|
||
**Решение:**
|
||
- Использовать context managers (`async with`) для ресурсов
|
||
- Обрабатывать данные чанками, не загружать всё в память
|
||
- Периодически перезапускать воркеры (restart policy)
|
||
|
||
### Проблемы с LISTEN/NOTIFY
|
||
|
||
**Симптомы:** Воркеры не просыпаются сразу после постановки задачи.
|
||
|
||
**Диагностика:**
|
||
```bash
|
||
# Проверить логи listener
|
||
docker logs dataloader | grep "notify_listener\|LISTEN"
|
||
|
||
# Проверить триггеры в БД
|
||
SELECT * FROM pg_trigger WHERE tgname LIKE 'dl_jobs%';
|
||
```
|
||
|
||
**Возможные причины:**
|
||
- Триггеры не созданы или отключены
|
||
- Проблемы с подключением asyncpg
|
||
- Воркер не подписан на канал
|
||
|
||
**Решение:**
|
||
- Воркер автоматически fallback'ится на polling при проблемах с LISTEN/NOTIFY
|
||
- Проверить DDL: триггеры `dl_jobs_notify_ins` и `dl_jobs_notify_upd`
|
||
- Проверить права пользователя БД на LISTEN/NOTIFY
|