diff --git a/README.md b/README.md index bd93dfc..f48737f 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,246 @@ # Dataloader +Асинхронный сервис постановки и исполнения ETL‑задач поверх очереди в PostgreSQL. Предоставляет HTTP API для триггера задач, мониторинга статуса и отмены. Внутри процесса поднимает пул воркеров, которые конкурируют за задачи через SELECT … FOR UPDATE SKIP LOCKED и обрабатывают их с учётом lease/heartbeat и кооперативной отмены. Для пробуждения воркеров используется LISTEN/NOTIFY. -## Архитектура -Проект построен на базе **FastAPI** с использованием паттерна **Clean Architecture** и следованием принципам **SOLID**. Основные архитектурные компоненты: +## Содержание +- О проекте +- Быстрый старт +- Конфигурация окружения +- Архитектура и потоки данных +- Структура проекта +- Взаимодействие с БД +- HTTP API +- Воркеры, пайплайны и добавление новых ETL‑задач +- Логирование, метрики, аудит +- Тестирование +- Эксплуатация и масштабирование +## О проекте +Сервис решает типовую задачу фоновой обработки задач: один общий пул воркеров, одна очередь в БД, несколько типов задач (пайплайнов), идемпотентность, повторные попытки, контроль конкуренции через advisory‑lock по `lock_key`, кооперативная отмена, возврат «потерянных» задач. + + +## Быстрый старт +1. Установить зависимости (poetry): + ```bash + poetry install + ``` +2. Подготовить PostgreSQL (см. DDL в `DDL.sql`) и переменные окружения (см. «Конфигурация»). +3. Запуск сервиса: + ```bash + poetry run python -m dataloader + ``` +4. Проверка доступности: + ```bash + curl http://localhost:8081/health + ``` +5. Пример постановки задачи: + ```bash + curl -X POST http://localhost:8081/api/v1/jobs/trigger \ + -H "Content-Type: application/json" \ + -d '{ + "queue": "etl.default", + "task": "noop", + "args": {"sleep1": 1, "sleep2": 1, "sleep3": 1}, + "lock_key": "customer:42", + "priority": 100 + }' + ``` + + +## Конфигурация окружения +Настройки собираются в `src/dataloader/config.py` через Pydantic Settings. + +- Приложение (`AppSettings`): + - `APP_HOST` (def: `0.0.0.0`) + - `APP_PORT` (def: `8081`) + - `TIMEZONE` (def: `Europe/Moscow`) + +- Логирование (`LogSettings`): + - `LOG_PATH`, `LOG_FILE_NAME`, `LOG_ROTATION` + - `METRIC_PATH`, `METRIC_FILE_NAME` + - `AUDIT_LOG_PATH`, `AUDIT_LOG_FILE_NAME` + - `DEBUG` переключает уровень на DEBUG + +- PostgreSQL (`PGSettings`): + - `PG_HOST`, `PG_PORT`, `PG_USER`, `PG_PASSWORD`, `PG_DATABASE` + - `PG_SCHEMA_QUEUE` - схема таблиц очереди (логическая, маппится через `schema_translate_map`) + - Параметры пула: `PG_USE_POOL`, `PG_POOL_SIZE`, `PG_MAX_OVERFLOW`, `PG_POOL_RECYCLE` + - Таймауты: `PG_CONNECT_TIMEOUT`, `PG_COMMAND_TIMEOUT` + +- Воркеры (`WorkerSettings`): + - `WORKERS_JSON` - список конфигураций воркеров, например: `[{"queue":"etl.default","concurrency":2}]` + - `DL_HEARTBEAT_SEC` (def: 10) + - `DL_DEFAULT_LEASE_TTL_SEC` (def: 60) + - `DL_REAPER_PERIOD_SEC` (def: 10) + - `DL_CLAIM_BACKOFF_SEC` (def: 15) + + +## Архитектура и потоки данных + +- HTTP слой: FastAPI‑приложение (`dataloader.api`) с v1 API и инфраструктурными маршрутами (`/health`, `/info`). +- Контекст: `AppContext` инициализирует логирование, `AsyncEngine`, `async_sessionmaker` и предоставляет DI (`get_session`). +- Очередь: одна таблица `dl_jobs` и журнал `dl_job_events` в PostgreSQL. Идемпотентность на уровне `idempotency_key`. Пробуждение воркеров через триггеры NOTIFY в БД и listener на стороне приложения. +- Воркеры: `WorkerManager` поднимает N асинхронных воркеров (`PGWorker`) на основании `WORKERS_JSON`. Каждый воркер: + 1) ждёт уведомление (LISTEN/NOTIFY) или таймаут, + 2) пытается «захватить» одну задачу (SELECT … FOR UPDATE SKIP LOCKED), + 3) выставляет `running`, получает advisory‑lock по `lock_key`, + 4) исполняет соответствующий пайплайн с heartbeat, + 5) завершает задачу: `succeeded`/`failed`/`canceled` или возвращает в очередь на ретрай. +- Реапер: фоновая задача, периодически возвращает «потерянные» running‑задачи в `queued`. + + +## Структура +``` +src/dataloader/ +├── __main__.py # Запуск uvicorn, lifecycle через FastAPI lifespan +├── config.py # Pydantic-настройки (app/log/pg/worker) +├── context.py # AppContext: engine, sessionmaker, логгер, DI +│ +├── api/ +│ ├── __init__.py # FastAPI app, middleware, routers, lifespan +│ ├── middleware.py # Логирование запросов + метрики/аудит +│ ├── os_router.py # /health, /info (инфраструктурные ручки) +│ ├── metric_router.py # Примеры метрик (like/dislike) +│ │ +│ └── v1/ +│ ├── router.py # /api/v1/jobs: trigger, status, cancel +│ ├── service.py # Бизнес-логика поверх репозитория +│ ├── schemas.py # Pydantic DTO API +│ └── utils.py # Утилиты (генерация UUID и т.д.) +│ +├── storage/ +│ ├── engine.py # AsyncEngine, sessionmaker, schema_translate_map +│ ├── notify_listener.py # asyncpg LISTEN/NOTIFY по каналу dl_jobs +│ │ +│ ├── models/ +│ │ ├── base.py # Declarative base +│ │ └── queue.py # ORM-модели DLJob, DLJobEvent +│ │ +│ ├── repositories/ +│ │ └── queue.py # QueueRepository (CRUD операции) +│ │ +│ └── schemas/ +│ └── queue.py # CreateJobRequest, JobStatus +│ +├── workers/ +│ ├── base.py # PGWorker: главный цикл, heartbeat, вызов пайплайнов +│ ├── manager.py # WorkerManager: запуск/остановка + reaper +│ ├── reaper.py # Requeue_lost на базе репозитория +│ │ +│ └── pipelines/ +│ ├── __init__.py # Автозагрузка модулей для регистрации +│ ├── registry.py # Реестр обработчиков задач (@register) +│ └── noop.py # Пример эталонного пайплайна +│ +└── logger/ + └── ... # Логирование +``` + + +## Взаимодействие с БД + +- Подключение: `postgresql+asyncpg` через SQLAlchemy AsyncEngine. +- Схемы: логическое имя `queue` маппится на реальную через `schema_translate_map` (см. `engine.py`), имя реальной схемы задаётся `PG_SCHEMA_QUEUE`. +- DDL: см. `DDL.sql`. Ключевые элементы: + - `dl_jobs` с индексами на claim и running‑lease, + - `dl_job_events` как журнал событий, + - триггер `notify_job_ready()` + `LISTEN dl_jobs` для пробуждения воркеров. +- Конкуренция: claim через `FOR UPDATE SKIP LOCKED`, взаимное исключение по бизнес‑сущности через advisory‑lock `pg_try_advisory_lock(hashtext(lock_key))`. +- Надёжность: at‑least‑once. Пайплайны должны быть идемпотентны в части записи в целевые таблицы. + + +## HTTP API (v1) + +- POST `/api/v1/jobs/trigger` + - Вход: `{queue, task, args?, idempotency_key?, lock_key, partition_key?, priority?, available_at?, max_attempts?, lease_ttl_sec?, producer?, consumer_group?}` + - Выход: `{job_id: UUID, status: str}` + - Идемпотентность по `idempotency_key` (если задан). + +- GET `/api/v1/jobs/{job_id}/status` + - Выход: `{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress: {}}` + +- POST `/api/v1/jobs/{job_id}/cancel` + - Выход: как в status + - Поведение: устанавливает `cancel_requested = true`; воркер завершает между шагами пайплайна. + +Инфраструктурные: `/health`, `/info`. + + +## Воркеры, пайплайны и добавление новых ETL‑задач + +### Как работает воркер +1. Ожидает сигнал (LISTEN/NOTIFY) или таймаут `DL_CLAIM_BACKOFF_SEC`. +2. Пытается забрать одну задачу своей очереди: `status='queued' AND available_at<=now()` с `FOR UPDATE SKIP LOCKED`. +3. Переводит в `running`, увеличивает `attempt`, выставляет `lease_expires_at`, делает heartbeat каждые `DL_HEARTBEAT_SEC`. +4. Захватывает advisory‑lock по `lock_key` (если не получилось - возвращает в `queued` с бэкоффом). +5. Выполняет пайплайн (`task`) с поддержкой итеративных шагов и кооперативной отмены. +6. По завершении: `succeeded` или `failed`/`canceled`; при ошибках возможны ретраи до `max_attempts`. + +### Интерфейс пайплайна +Пайплайн - обычная функция, возвращающая одно из: +- асинхронный генератор шагов (рекомендуется для длинных процессов), +- корутину, +- синхронную функцию. + +Каждый «yield» в асинхронном генераторе - безопасная точка, где воркер выполнит heartbeat и проверит `cancel_requested`. + +Регистрация нового пайплайна через декоратор `@register("task_name")` в модуле `src/dataloader/workers/pipelines/.py`. + +Пример: +```python +from __future__ import annotations +import asyncio +from typing import AsyncIterator +from dataloader.workers.pipelines.registry import register + +@register("load.customers") +async def load_customers(args: dict) -> AsyncIterator[None]: + # шаг 1 – вытягиваем данные + await some_fetch(args) + yield + # шаг 2 – пишем в БД идемпотентно (upsert/merge) + await upsert_customers(args) + yield + # шаг 3 – пост‑обработка + await finalize(args) + yield +``` + +Важно: +- Пайплайны должны быть идемпотентны (повторный запуск не должен ломать данные). +- Долгие операции разбивайте на шаги с `yield`, чтобы работал heartbeat и отмена. +- Для бизнес‑взаимного исключения выбирайте корректный `lock_key` (например, `customer:{id}`), чтобы параллельные задачи не конфликтовали. + +### Добавление ETL‑задачи (шаги) +1. Создать модуль в `workers/pipelines/` и зарегистрировать обработчик через `@register`. +2. Убедиться, что модуль импортируется автоматически (происходит на старте через `load_all()`). +3. При необходимости расширить схему `args` и валидацию на стороне продьюсера (кто вызывает API). +4. При постановке задачи в `POST /api/v1/jobs/trigger` указать `task` с новым именем и желаемые `args`. +5. При необходимости завести отдельную очередь (`queue`) и добавить её в `WORKERS_JSON` с нужной `concurrency`. + + +## Логирование, метрики, аудит +- Логи: структурированные, через `logger/*`. Middleware (`api/middleware.py`) логирует входящие запросы/исходящие ответы, время обработки, пишет метрики и аудит‑события. +- Метрики: простые счётчики (пример: likes/dislikes, requests_total, responses_total, duration_ms). +- Аудит: запись бизнес‑событий начала/окончания обработки запроса. + + +## Тестирование +- Юнит‑тесты: `tests/unit/*` +- Интеграционные тесты: `tests/integration_tests/*` - покрывают API, репозиторий очереди, reaper. +- Запуск: + ```bash + poetry run pytest -q + ``` + + +## Эксплуатация и масштабирование +- Один процесс FastAPI с пулом асинхронных воркеров внутри. Масштабирование - количеством реплик (Deployment). Очередь в БД и advisory‑lock обеспечат корректность при гонках между репликами. +- Readiness/Liveness - `/health`. +- Корректное завершение: при SIGTERM менеджер подаёт сигнал остановки, завершает воркеры и reaper, закрывает соединения. + ## Лицензия - Внутренний корпоративный проект. diff --git a/TZ.md b/TZ.md index 180f396..9610b79 100644 --- a/TZ.md +++ b/TZ.md @@ -2,16 +2,16 @@ ## 1) Назначение и рамки -`dataloader` — сервис постановки и исполнения долгих ETL-задач через одну общую очередь в Postgres. Сервис предоставляет HTTP-ручки для триггера задач, мониторинга статуса и отмены; внутри процесса запускает N асинхронных воркеров, которые конкурируют за задачи через `SELECT … FOR UPDATE SKIP LOCKED`, держат lease/heartbeat, делают идемпотентные записи в целевые БД и корректно обрабатывают повторы. +`dataloader` - сервис постановки и исполнения долгих ETL-задач через одну общую очередь в Postgres. Сервис предоставляет HTTP-ручки для триггера задач, мониторинга статуса и отмены; внутри процесса запускает N асинхронных воркеров, которые конкурируют за задачи через `SELECT … FOR UPDATE SKIP LOCKED`, держат lease/heartbeat, делают идемпотентные записи в целевые БД и корректно обрабатывают повторы. -Архитектura и инфраструктурные части соответствуют шаблону `rest_template.md`: единый пакет, `os_router.py` с `/health` и `/status`, middleware логирования, структура каталогов и конфиг-классы — **как в шаблоне**. +Архитектura и инфраструктурные части соответствуют шаблону `rest_template.md`: единый пакет, `os_router.py` с `/health` и `/status`, middleware логирования, структура каталогов и конфиг-классы - **как в шаблоне**. --- ## 2) Архитектура (одно приложение, async) * **FastAPI-приложение**: HTTP API v1, инфраструктурные роуты (`/health`, `/status`) из шаблона, middleware и логирование из шаблона. -* **WorkerManager**: на `startup` читает конфиг (`WORKERS_JSON`) и поднимает M асинхронных воркер-циклов (по очередям и уровням параллелизма). На `shutdown` — мягкая остановка. +* **WorkerManager**: на `startup` читает конфиг (`WORKERS_JSON`) и поднимает M асинхронных воркер-циклов (по очередям и уровням параллелизма). На `shutdown` - мягкая остановка. * **PG Queue**: одна таблица `dl_jobs` на все очереди и сервисы; журнал `dl_job_events`; триггеры LISTEN/NOTIFY для пробуждения воркеров без активного поллинга. --- @@ -164,7 +164,7 @@ FOR EACH ROW EXECUTE FUNCTION notify_job_ready(); Выход: `{… как status …}` Поведение: устанавливает `cancel_requested = true`. Воркер кооперативно завершает задачу между чанками. -Инфраструктурные эндпоинты `/health`, `/status`, мидлвар и регистрация роутов — **как в шаблоне**. +Инфраструктурные эндпоинты `/health`, `/status`, мидлвар и регистрация роутов - **как в шаблоне**. --- @@ -192,7 +192,7 @@ WHERE j.job_id = cte.job_id RETURNING j.job_id, j.task, j.args, j.lock_key, j.partition_key, j.lease_ttl_sec; ``` -Затем `SELECT pg_try_advisory_lock(hashtext(:lock_key))`. Если `false` — `backoff`: +Затем `SELECT pg_try_advisory_lock(hashtext(:lock_key))`. Если `false` - `backoff`: ```sql UPDATE dl_jobs @@ -233,7 +233,7 @@ WHERE job_id=:jid; Всегда выставлять/снимать advisory-lock на `lock_key`. -4. **Отмена**: воркер проверяет `cancel_requested` между чанками; при `true` завершает пайплайн (обычно как `canceled` либо как `failed` без ретраев — политика проекта). +4. **Отмена**: воркер проверяет `cancel_requested` между чанками; при `true` завершает пайплайн (обычно как `canceled` либо как `failed` без ретраев - политика проекта). 5. **Reaper** (фон у приложения): раз в `DL_REAPER_PERIOD_SEC` возвращает «потерянные» задачи в очередь. @@ -250,37 +250,37 @@ RETURNING job_id; ## 7) Оптимизация и SLA -* Claim — O(log N) благодаря частичному индексу `ix_dl_jobs_claim`. -* Reaper — O(log N) по индексу `ix_dl_jobs_running_lease`. -* `/health` — без БД; время ответа ≤ 20 мс. `/jobs/*` — не держат долгих транзакций. -* Гарантия доставки: **at-least-once**; операции записи в целевые таблицы — идемпотентны (реализуется в конкретных пайплайнах). +* Claim - O(log N) благодаря частичному индексу `ix_dl_jobs_claim`. +* Reaper - O(log N) по индексу `ix_dl_jobs_running_lease`. +* `/health` - без БД; время ответа ≤ 20 мс. `/jobs/*` - не держат долгих транзакций. +* Гарантия доставки: **at-least-once**; операции записи в целевые таблицы - идемпотентны (реализуется в конкретных пайплайнах). * Конкуренция: один `lock_key` одновременно исполняется одним воркером; параллелизм достигается независимыми `partition_key`. --- ## 8) Конфигурация (ENV) -* `DL_DB_DSN` — DSN Postgres (async). -* `WORKERS_JSON` — JSON-список конфигураций воркеров, напр.: `[{"queue":"load.cbr","concurrency":2},{"queue":"load.sgx","concurrency":1}]`. +* `DL_DB_DSN` - DSN Postgres (async). +* `WORKERS_JSON` - JSON-список конфигураций воркеров, напр.: `[{"queue":"load.cbr","concurrency":2},{"queue":"load.sgx","concurrency":1}]`. * `DL_HEARTBEAT_SEC` (деф. 10), `DL_DEFAULT_LEASE_TTL_SEC` (деф. 60), `DL_REAPER_PERIOD_SEC` (деф. 10), `DL_CLAIM_BACKOFF_SEC` (деф. 15). -* Логирование, middleware, `uvicorn_logging_config` — **из шаблона без изменения контрактов**. +* Логирование, middleware, `uvicorn_logging_config` - **из шаблона без изменения контрактов**. --- ## 9) Эксплуатация и деплой * Один контейнер, один Pod, **несколько async-воркеров** внутри процесса (через `WorkerManager`). -* Масштабирование — количеством реплик Deployment: очередь в БД, `FOR UPDATE SKIP LOCKED` и advisory-lock обеспечат корректность в гонке. +* Масштабирование - количеством реплик Deployment: очередь в БД, `FOR UPDATE SKIP LOCKED` и advisory-lock обеспечат корректность в гонке. * Пробы: `readiness/liveness` на `/health` из `os_router.py`. -* Завершение: на SIGTERM — остановить reaper, подать сигнал воркерам для мягкой остановки, дождаться тасков с таймаутом. +* Завершение: на SIGTERM - остановить reaper, подать сигнал воркерам для мягкой остановки, дождаться тасков с таймаутом. --- ## 10) Безопасность, аудит, наблюдаемость -* Структурные логи через `logger/*` шаблона; маскирование чувствительных полей — как в `logger/utils.py`. +* Структурные логи через `logger/*` шаблона; маскирование чувствительных полей - как в `logger/utils.py`. * Журнал жизненного цикла в `dl_job_events` (queued/picked/heartbeat/requeue/done/failed/canceled). -* Метрики (BETA) — через `metric_router.py` из шаблона при необходимости. +* Метрики (BETA) - через `metric_router.py` из шаблона при необходимости. --- diff --git a/rest_template.md b/rest_template.md index 4bbdabb..7455459 100644 --- a/rest_template.md +++ b/rest_template.md @@ -61,7 +61,7 @@ aigw-project/ | `__main__.py` | Точка входа. Запускает FastAPI-приложение. | | `config.py` | Загрузка и обработка переменных окружения. | | `base.py` | Базовые классы и типы, переиспользуемые в проекте. | -| `context.py` | Реализация паттерна `AppContext` — единая точка доступа к зависимостям. | +| `context.py` | Реализация паттерна `AppContext` - единая точка доступа к зависимостям. | --- @@ -81,7 +81,7 @@ aigw-project/ | Файл | Назначение | |--------------------|------------| -| `__init__.py` | Конфигуратор FastAPI — регистрация версий и роутов. | +| `__init__.py` | Конфигуратор FastAPI - регистрация версий и роутов. | | `os_router.py` | Инфраструктурные endpoint'ы (`/health`, `/status`). ⚠️ Не редактировать. | | `metric_router.py` | Метрики (BETA). ⚠️ Не редактировать. | | `schemas.py` | Схемы (Pydantic) для `os_router` и `metric_router`. ⚠️ Не редактировать. | @@ -116,7 +116,7 @@ from tenera_etl.logger import logger logger.info("End processing user registration request") ``` -⚠️ Не передавайте в logger.info(...) ничего, кроме строки — она будет записана в поле message. +⚠️ Не передавайте в logger.info(...) ничего, кроме строки - она будет записана в поле message. Маскирование чувствительных данных @@ -126,4 +126,4 @@ logger.info("End processing user registration request") работает автоматически, но вы можете конфигурировать список слов и правила. -Перед добавлением кастомной маскировки — ознакомьтесь с документацией, чтобы избежать утечки данных. \ No newline at end of file +Перед добавлением кастомной маскировки - ознакомьтесь с документацией, чтобы избежать утечки данных. \ No newline at end of file diff --git a/src/dataloader/workers/base.py b/src/dataloader/workers/base.py index d8466a6..3c809d6 100644 --- a/src/dataloader/workers/base.py +++ b/src/dataloader/workers/base.py @@ -38,7 +38,7 @@ class PGWorker: async def run(self) -> None: """ - Главный цикл: ожидание → claim → исполнение → завершение. + Главный цикл: ожидание -> claim -> исполнение -> завершение. """ self._log.info(f"worker.start queue={self._cfg.queue}")