diff --git a/README.md b/README.md index f48737f..73cb287 100644 --- a/README.md +++ b/README.md @@ -153,19 +153,108 @@ src/dataloader/ ## HTTP API (v1) -- POST `/api/v1/jobs/trigger` - - Вход: `{queue, task, args?, idempotency_key?, lock_key, partition_key?, priority?, available_at?, max_attempts?, lease_ttl_sec?, producer?, consumer_group?}` - - Выход: `{job_id: UUID, status: str}` - - Идемпотентность по `idempotency_key` (если задан). +### POST `/api/v1/jobs/trigger` -- GET `/api/v1/jobs/{job_id}/status` - - Выход: `{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress: {}}` +Постановка задачи в очередь (идемпотентная операция). -- POST `/api/v1/jobs/{job_id}/cancel` - - Выход: как в status - - Поведение: устанавливает `cancel_requested = true`; воркер завершает между шагами пайплайна. +**Request:** +```json +{ + "queue": "load.cbr", // обязательно: имя очереди + "task": "load.cbr.rates", // обязательно: имя задачи для registry + "args": { // опционально: аргументы задачи + "date": "2025-01-10", + "currencies": ["USD", "EUR"] + }, + "idempotency_key": "cbr_2025-01-10", // опционально: ключ идемпотентности + "lock_key": "cbr_rates", // обязательно: ключ для advisory lock + "partition_key": "2025-01-10", // опционально: ключ партиционирования + "priority": 100, // опционально: приоритет (меньше = выше) + "available_at": "2025-01-10T00:00:00Z", // опционально: отложенный запуск + "max_attempts": 3, // опционально: макс попыток (def: 5) + "lease_ttl_sec": 300, // опционально: TTL аренды (def: 60) + "producer": "api-client", // опционально: кто поставил + "consumer_group": "cbr-loaders" // опционально: группа потребителей +} +``` -Инфраструктурные: `/health`, `/info`. +**Response 200:** +```json +{ + "job_id": "550e8400-e29b-41d4-a716-446655440000", + "status": "queued" +} +``` + +**Коды ответов:** +- `200 OK` - задача создана или уже существует (идемпотентность) +- `400 Bad Request` - невалидные данные +- `500 Internal Server Error` - ошибка сервера + +### GET `/api/v1/jobs/{job_id}/status` + +Получение статуса задачи. + +**Response 200:** +```json +{ + "job_id": "550e8400-e29b-41d4-a716-446655440000", + "status": "running", // queued/running/succeeded/failed/canceled + "attempt": 1, // текущая попытка + "started_at": "2025-01-10T12:00:00Z", // время первого запуска + "finished_at": null, // время завершения (если есть) + "heartbeat_at": "2025-01-10T12:01:30Z", // последний heartbeat + "error": null, // текст ошибки (если есть) + "progress": { // прогресс выполнения (custom) + "processed": 500, + "total": 1000 + } +} +``` + +**Коды ответов:** +- `200 OK` - статус получен +- `404 Not Found` - задача не найдена + +### POST `/api/v1/jobs/{job_id}/cancel` + +Запрос кооперативной отмены задачи. + +**Response 200:** +```json +{ + "job_id": "550e8400-e29b-41d4-a716-446655440000", + "status": "running", + "attempt": 1, + "started_at": "2025-01-10T12:00:00Z", + "heartbeat_at": "2025-01-10T12:01:30Z" +} +``` + +**Поведение:** +- Устанавливает флаг `cancel_requested = true` в БД +- Воркер проверяет флаг между `yield` в пайплайне +- При обнаружении флага воркер завершает задачу со статусом `canceled` + +**Коды ответов:** +- `200 OK` - запрос отмены принят +- `404 Not Found` - задача не найдена + +### Инфраструктурные эндпоинты + +**GET `/health`** - проверка работоспособности (без БД, < 20ms) +```json +{"status": "healthy"} +``` + +**GET `/info`** - информация о сервисе +```json +{ + "service": "dataloader", + "version": "1.0.0", + "environment": "production" +} +``` ## Воркеры, пайплайны и добавление новых ETL‑задач @@ -178,6 +267,78 @@ src/dataloader/ 5. Выполняет пайплайн (`task`) с поддержкой итеративных шагов и кооперативной отмены. 6. По завершении: `succeeded` или `failed`/`canceled`; при ошибках возможны ретраи до `max_attempts`. +### Протокол выполнения задачи (SQL) + +**Claim (захват задачи):** +```sql +WITH cte AS ( + SELECT job_id + FROM dl_jobs + WHERE status = 'queued' + AND queue = :queue + AND available_at <= now() + ORDER BY priority ASC, created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 +) +UPDATE dl_jobs j +SET status = 'running', + started_at = COALESCE(started_at, now()), + attempt = attempt + 1, + lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec), + heartbeat_at = now() +FROM cte +WHERE j.job_id = cte.job_id +RETURNING j.job_id, j.task, j.args, j.lock_key, j.lease_ttl_sec; +``` + +**Heartbeat (продление аренды):** +```sql +UPDATE dl_jobs +SET heartbeat_at = now(), + lease_expires_at = now() + make_interval(secs => :ttl) +WHERE job_id = :job_id AND status = 'running' +RETURNING cancel_requested; +``` + +**Завершение успешное:** +```sql +UPDATE dl_jobs +SET status = 'succeeded', + finished_at = now(), + lease_expires_at = NULL +WHERE job_id = :job_id; + +SELECT pg_advisory_unlock(hashtext(:lock_key)); +``` + +**Завершение с ошибкой (retry):** +```sql +UPDATE dl_jobs +SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END, + available_at = CASE WHEN attempt < max_attempts + THEN now() + make_interval(secs => 30 * attempt) + ELSE now() END, + error = :error_message, + lease_expires_at = NULL, + finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END +WHERE job_id = :job_id; + +SELECT pg_advisory_unlock(hashtext(:lock_key)); +``` + +**Reaper (возврат потерянных задач):** +```sql +UPDATE dl_jobs +SET status = 'queued', + available_at = now(), + lease_expires_at = NULL +WHERE status = 'running' + AND lease_expires_at IS NOT NULL + AND lease_expires_at < now() +RETURNING job_id; +``` + ### Интерфейс пайплайна Пайплайн - обычная функция, возвращающая одно из: - асинхронный генератор шагов (рекомендуется для длинных процессов), @@ -214,11 +375,84 @@ async def load_customers(args: dict) -> AsyncIterator[None]: - Для бизнес‑взаимного исключения выбирайте корректный `lock_key` (например, `customer:{id}`), чтобы параллельные задачи не конфликтовали. ### Добавление ETL‑задачи (шаги) -1. Создать модуль в `workers/pipelines/` и зарегистрировать обработчик через `@register`. -2. Убедиться, что модуль импортируется автоматически (происходит на старте через `load_all()`). -3. При необходимости расширить схему `args` и валидацию на стороне продьюсера (кто вызывает API). -4. При постановке задачи в `POST /api/v1/jobs/trigger` указать `task` с новым именем и желаемые `args`. -5. При необходимости завести отдельную очередь (`queue`) и добавить её в `WORKERS_JSON` с нужной `concurrency`. + +**1. Создать пайплайн** в `src/dataloader/workers/pipelines/`: + +```python +# src/dataloader/workers/pipelines/load_cbr_rates.py +from __future__ import annotations +from typing import AsyncIterator +from datetime import datetime +from dataloader.workers.pipelines.registry import register + +@register("load.cbr.rates") +async def load_cbr_rates(args: dict) -> AsyncIterator[None]: + """ + Загрузка курсов валют ЦБ РФ. + + Args: + args: {"date": "YYYY-MM-DD", "currencies": ["USD", "EUR"]} + """ + date = datetime.fromisoformat(args["date"]) + currencies = args.get("currencies", ["USD", "EUR"]) + + # Извлечение данных + data = await fetch_cbr_rates(date, currencies) + yield # Heartbeat checkpoint + + # Трансформация + transformed = transform_rates(data) + yield # Heartbeat checkpoint + + # Идемпотентная загрузка в БД + async with get_target_session() as session: + await session.execute( + insert(CbrRates) + .values(transformed) + .on_conflict_do_update( + index_elements=["date", "currency"], + set_={"rate": excluded.c.rate, "updated_at": func.now()} + ) + ) + await session.commit() + yield +``` + +**2. Настроить воркеры** в `.env`: + +```bash +WORKERS_JSON='[{"queue":"load.cbr","concurrency":2}]' +``` + +**3. Поставить задачу через API**: + +```bash +curl -X POST http://localhost:8081/api/v1/jobs/trigger \ + -H "Content-Type: application/json" \ + -d '{ + "queue": "load.cbr", + "task": "load.cbr.rates", + "args": {"date": "2025-01-10", "currencies": ["USD", "EUR"]}, + "lock_key": "cbr_rates_2025-01-10", + "partition_key": "2025-01-10", + "priority": 100, + "max_attempts": 3, + "lease_ttl_sec": 300 + }' +``` + +**4. Мониторить выполнение**: + +```bash +# Получить job_id из ответа и проверить статус +curl http://localhost:8081/api/v1/jobs/{job_id}/status +``` + +**Ключевые моменты**: +- Пайплайн должен быть идемпотентным (повторный запуск не должен ломать данные) +- Используйте `yield` после каждого значимого чанка работы для heartbeat +- `lock_key` должен обеспечивать взаимное исключение (например, `customer:{id}`) +- `partition_key` используется для параллелизации независимых задач ## Логирование, метрики, аудит @@ -228,19 +462,257 @@ async def load_customers(args: dict) -> AsyncIterator[None]: ## Тестирование -- Юнит‑тесты: `tests/unit/*` -- Интеграционные тесты: `tests/integration_tests/*` - покрывают API, репозиторий очереди, reaper. -- Запуск: - ```bash - poetry run pytest -q - ``` + +### Структура тестов + +``` +tests/ +├── conftest.py # Глобальные фикстуры (db_engine, db_session, client) +├── integration_tests/ # Интеграционные тесты с реальной БД +│ ├── test_queue_repository.py # 12 тестов репозитория +│ └── test_api_endpoints.py # 7 тестов API endpoints +└── unit/ # Юнит-тесты с моками (92 теста) + ├── test_config.py # 30 тестов конфигурации + ├── test_context.py # 13 тестов AppContext + ├── test_api_service.py # 10 тестов сервисного слоя + ├── test_notify_listener.py # 13 тестов LISTEN/NOTIFY + ├── test_workers_base.py # 14 тестов PGWorker + ├── test_workers_manager.py # 10 тестов WorkerManager + └── test_pipeline_registry.py # 5 тестов реестра пайплайнов +``` + +### Запуск тестов + +```bash +# Все тесты (111 тестов) +poetry run pytest + +# Только юнит-тесты +poetry run pytest tests/unit/ -m unit + +# Только интеграционные +poetry run pytest tests/integration_tests/ -m integration + +# С покрытием кода +poetry run pytest --cov=dataloader --cov-report=html + +# С подробным выводом +poetry run pytest -v -s +``` + +### Покрытие кода + +Текущее покрытие: **91%** (788 строк / 715 покрыто) + +``` +Name Stmts Miss Cover +--------------------------------------------------------------- +src/dataloader/config.py 79 0 100% +src/dataloader/context.py 39 0 100% +src/dataloader/api/v1/service.py 32 0 100% +src/dataloader/storage/models/queue.py 43 0 100% +src/dataloader/storage/schemas/queue.py 29 0 100% +src/dataloader/storage/notify_listener.py 49 0 100% +src/dataloader/workers/base.py 102 3 97% +src/dataloader/workers/manager.py 64 0 100% +src/dataloader/storage/repositories/queue 130 12 91% +--------------------------------------------------------------- +TOTAL 788 73 91% +``` + +### Ключевые тест-сценарии + +**Интеграционные тесты:** +- Постановка задачи через API → проверка статуса +- Идемпотентность через `idempotency_key` +- Claim задачи → heartbeat → успешное завершение +- Claim задачи → ошибка → retry → финальный fail +- Конкуренция воркеров через advisory lock +- Возврат потерянных задач (reaper) +- Отмена задачи пользователем + +**Юнит-тесты:** +- Конфигурация из переменных окружения +- Создание и управление воркерами +- LISTEN/NOTIFY механизм +- Сервисный слой и репозиторий +- Протокол heartbeat и отмены -## Эксплуатация и масштабирование -- Один процесс FastAPI с пулом асинхронных воркеров внутри. Масштабирование - количеством реплик (Deployment). Очередь в БД и advisory‑lock обеспечат корректность при гонках между репликами. -- Readiness/Liveness - `/health`. -- Корректное завершение: при SIGTERM менеджер подаёт сигнал остановки, завершает воркеры и reaper, закрывает соединения. +### Масштабирование + +- **Вертикальное**: Увеличение `concurrency` в `WORKERS_JSON` для существующих воркеров +- **Горизонтальное**: Увеличение количества реплик (pods). Очередь в БД и advisory-lock обеспечат корректность при конкуренции между репликами +- **По очередям**: Разные deployment'ы для разных очередей с разными ресурсами + +### Graceful Shutdown + +При получении SIGTERM: +1. Останавливает прием новых HTTP запросов +2. Сигнализирует воркерам о необходимости завершения +3. Ждет завершения текущих задач (timeout 30 сек) +4. Останавливает reaper +5. Закрывает соединения с БД + +### Мониторинг + +**Health Checks:** +- `GET /health` - проверка работоспособности (без БД, < 20ms) +- `GET /info` - информация о версии + +**Метрики (если включен metric_router):** +- Количество задач по статусам (queued, running, succeeded, failed) +- Время выполнения задач (p50, p95, p99) +- Количество активных воркеров +- Частота ошибок + +**Логи:** +Структурированные JSON-логи с уровнями: DEBUG, INFO, WARNING, ERROR, CRITICAL + +**Ключевые события для алертов:** +- `worker.claim.backoff` - частые backoff'ы (возможна конкуренция) +- `worker.complete.failed` - высокий процент ошибок +- `reaper.requeued` - частый возврат потерянных задач (проблемы с lease) +- `api.error` - ошибки API -## Лицензия -Внутренний корпоративный проект. +## Troubleshooting + +### Задачи застревают в статусе `queued` + +**Симптомы:** Задачи не начинают выполняться, остаются в `queued`. + +**Возможные причины:** +1. Воркеры не запущены или упали +2. Нет воркеров для данной очереди в `WORKERS_JSON` +3. `available_at` в будущем + +**Решение:** +```bash +# Проверить логи воркеров +docker logs dataloader | grep worker + +# Проверить конфигурацию +echo $WORKERS_JSON + +# Проверить задачи в БД +SELECT job_id, queue, status, available_at, created_at +FROM dl_jobs +WHERE status = 'queued' +ORDER BY created_at DESC +LIMIT 10; +``` + +### Задачи часто возвращаются в `queued` (backoff) + +**Симптомы:** В логах частые события `worker.claim.backoff`. + +**Причины:** +- Конкуренция за `lock_key`: несколько задач с одинаковым `lock_key` одновременно +- Advisory lock уже занят другим процессом + +**Решение:** +- Проверить корректность выбора `lock_key` (должен быть уникальным для бизнес-сущности) +- Использовать `partition_key` для распределения нагрузки +- Снизить `concurrency` для данной очереди + +### Высокий процент `failed` задач + +**Симптомы:** Много задач завершаются с `status = 'failed'`. + +**Диагностика:** +```sql +SELECT job_id, task, error, attempt, max_attempts +FROM dl_jobs +WHERE status = 'failed' +ORDER BY finished_at DESC +LIMIT 20; +``` + +**Возможные причины:** +- Ошибки в коде пайплайна +- Недоступность внешних сервисов +- Таймауты (превышение `lease_ttl_sec`) +- Неверные аргументы в `args` + +**Решение:** +- Проверить логи с `job_id` +- Увеличить `max_attempts` для retry +- Увеличить `lease_ttl_sec` для долгих операций +- Исправить код пайплайна + + +### Медленное выполнение задач + +**Симптомы:** Задачи выполняются дольше ожидаемого. + +**Диагностика:** +```sql +SELECT + task, + AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_duration_sec, + COUNT(*) as total +FROM dl_jobs +WHERE status IN ('succeeded', 'failed') + AND finished_at > NOW() - INTERVAL '1 hour' +GROUP BY task +ORDER BY avg_duration_sec DESC; +``` + +**Возможные причины:** +- Неоптимальный код пайплайна +- Медленные внешние сервисы +- Недостаточно воркеров (`concurrency` слишком мал) +- Проблемы с БД (медленные запросы, блокировки) + +**Решение:** +- Профилировать код пайплайна +- Увеличить `concurrency` в `WORKERS_JSON` +- Оптимизировать запросы к БД (индексы, batching) +- Масштабировать горизонтально (больше реплик) + +### Утечка памяти + +**Симптомы:** Постепенный рост потребления памяти, OOM kills. + +**Диагностика:** +```bash +# Мониторинг памяти +kubectl top pods -l app=dataloader --containers + +# Проверить логи перед падением +kubectl logs dataloader-xxx --previous +``` + +**Возможные причины:** +- Накопление объектов в памяти в пайплайне +- Незакрытые соединения/файлы +- Утечки в зависимостях + +**Решение:** +- Использовать context managers (`async with`) для ресурсов +- Обрабатывать данные чанками, не загружать всё в память +- Периодически перезапускать воркеры (restart policy) + +### Проблемы с LISTEN/NOTIFY + +**Симптомы:** Воркеры не просыпаются сразу после постановки задачи. + +**Диагностика:** +```bash +# Проверить логи listener +docker logs dataloader | grep "notify_listener\|LISTEN" + +# Проверить триггеры в БД +SELECT * FROM pg_trigger WHERE tgname LIKE 'dl_jobs%'; +``` + +**Возможные причины:** +- Триггеры не созданы или отключены +- Проблемы с подключением asyncpg +- Воркер не подписан на канал + +**Решение:** +- Воркер автоматически fallback'ится на polling при проблемах с LISTEN/NOTIFY +- Проверить DDL: триггеры `dl_jobs_notify_ins` и `dl_jobs_notify_upd` +- Проверить права пользователя БД на LISTEN/NOTIFY