dataloader/TODO.md

81 lines
4.4 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

## План внедрения
1. **Шаблон + каркас пакета** - сделано
* Создать структуру из ТЗ (один пакет `dataloader/` по src-layout).
* Подтянуть `rest_template.md` артефакты: `os_router.py`, `middleware.py`, `logger/*`.
* `pyproject.toml`: `fastapi`, `uvicorn`, `pydantic-settings`, `sqlalchemy>=2, async`, `psycopg[binary,pool]` или `asyncpg`, `httpx`, `pytest`, `pytest-asyncio`, `httpx[cli]`.
* **Критерий:** `uvicorn dataloader.__main__:app` поднимается, `/health` отдаёт 200.
2. **Конфиг и контекст** - сделано
* `config.py`: `AppSettings` (DSN, тайминги, WORKERS_JSON).
* `context.py`: `AppContext`, создание `AsyncEngine` и `async_sessionmaker`, DI.
* **Критерий:** `/status` возвращает версию/uptime, движок создаётся на старте без попыток коннекта в `/health`.
3. **Хранилище очереди** - в работе
* `storage/db.py`: фабрики engine/sessionmaker.
* `storage/repositories.py`: методы
* `create_or_get(req)`,
* `get_status(job_id)`,
* `cancel(job_id)`,
* `requeue_lost(now)`,
* вспомогательные `claim_one(queue)`, `heartbeat(job_id, ttl)`, `finish_ok(job_id)`, `finish_fail_or_retry(job_id, err)`.
* Только чистый SQL (как в ТЗ), транзакция на операцию.
* **Критерий:** unit-интегра тест «поставил-прочитал-отменил» проходит.
4. **API v1**
* `api/v1/schemas.py`: `TriggerJobRequest/Response`, `JobStatusResponse`.
* `api/v1/service.py`: бизнес-слой над репозиторием.
* `api/v1/router.py`: `POST /jobs/trigger`, `GET /jobs/{id}/status`, `POST /jobs/{id}/cancel`.
* **Критерий:** ручки соответствуют контрактам, идемпотентность по `idempotency_key` работает.
5. **Базовый воркер**
* `workers/base.py`: класс `PGWorker` с циклами `listen_or_sleep → claim → advisory_lock → _pipeline → heartbeat → finish`.
* Идём строго по SQL из ТЗ: `FOR UPDATE SKIP LOCKED`, lease/heartbeat, backoff при lock.
* **Критерий:** локальный мок-пайплайн выполняется, статус `succeeded`.
6. **Менеджер воркеров**
* `workers/manager.py`: парсинг `WORKERS_JSON`, создание `asyncio.Task` на воркеры; мягкая остановка на shutdown.
* Подключение в `__main__.py` через FastAPI `on_startup/on_shutdown`.
* **Критерий:** при старте создаются нужные таски, при SIGTERM корректно гасим.
7. **Реестр пайплайнов**
* `workers/pipelines/registry.py`: `@register(task)`, `resolve(task)`.
* Пустой эталонный пайплайн (no-op, имитирует 23 чанка).
* **Критерий:** задача с `task="noop"` исполняется через реестр.
8. **Reaper**
* Фоновая async-задача в приложении: `requeue_lost` раз в `DL_REAPER_PERIOD_SEC`.
* **Критерий:** задачи с протухшим `lease_expires_at` возвращаются в `queued`.
9. **Интеграционные тесты**
* `tests/integration_tests/v1_api/test_service.py`:
* trigger → status (queued),
* воркер подхватил → status (running),
* done → status (succeeded),
* cancel во время пайплайна → корректная реакция.
* **Критерий:** тесты зелёные в CI.
10. **Dockerfile и запуск**
* Slim образ на Python 3.11/3.12, `uvicorn` entrypoint.
* ENV-пример `.env`, README с запуском.
* **Критерий:** контейнер стартует, воркеры работают, API доступно.
11. **Наблюдаемость**
* Логи в формате шаблона (структурные, маскирование).
* Простая сводка в `/status` (кол-во активных воркеров, конфиг таймингов).
* **Критерий:** видно ключевые переходы статусов и ошибки пайплайнов.