dataloader/TODO.md

4.4 KiB
Raw Permalink Blame History

План внедрения

  1. Шаблон + каркас пакета - сделано

    • Создать структуру из ТЗ (один пакет dataloader/ по src-layout).
    • Подтянуть rest_template.md артефакты: os_router.py, middleware.py, logger/*.
    • pyproject.toml: fastapi, uvicorn, pydantic-settings, sqlalchemy>=2, async, psycopg[binary,pool] или asyncpg, httpx, pytest, pytest-asyncio, httpx[cli].
    • Критерий: uvicorn dataloader.__main__:app поднимается, /health отдаёт 200.
  2. Конфиг и контекст - сделано

    • config.py: AppSettings (DSN, тайминги, WORKERS_JSON).
    • context.py: AppContext, создание AsyncEngine и async_sessionmaker, DI.
    • Критерий: /status возвращает версию/uptime, движок создаётся на старте без попыток коннекта в /health.
  3. Хранилище очереди - в работе

    • storage/db.py: фабрики engine/sessionmaker.

    • storage/repositories.py: методы

      • create_or_get(req),
      • get_status(job_id),
      • cancel(job_id),
      • requeue_lost(now),
      • вспомогательные claim_one(queue), heartbeat(job_id, ttl), finish_ok(job_id), finish_fail_or_retry(job_id, err).
    • Только чистый SQL (как в ТЗ), транзакция на операцию.

    • Критерий: unit-интегра тест «поставил-прочитал-отменил» проходит.

  4. API v1

    • api/v1/schemas.py: TriggerJobRequest/Response, JobStatusResponse.
    • api/v1/service.py: бизнес-слой над репозиторием.
    • api/v1/router.py: POST /jobs/trigger, GET /jobs/{id}/status, POST /jobs/{id}/cancel.
    • Критерий: ручки соответствуют контрактам, идемпотентность по idempotency_key работает.
  5. Базовый воркер

    • workers/base.py: класс PGWorker с циклами listen_or_sleep → claim → advisory_lock → _pipeline → heartbeat → finish.
    • Идём строго по SQL из ТЗ: FOR UPDATE SKIP LOCKED, lease/heartbeat, backoff при lock.
    • Критерий: локальный мок-пайплайн выполняется, статус succeeded.
  6. Менеджер воркеров

    • workers/manager.py: парсинг WORKERS_JSON, создание asyncio.Task на воркеры; мягкая остановка на shutdown.
    • Подключение в __main__.py через FastAPI on_startup/on_shutdown.
    • Критерий: при старте создаются нужные таски, при SIGTERM корректно гасим.
  7. Реестр пайплайнов

    • workers/pipelines/registry.py: @register(task), resolve(task).
    • Пустой эталонный пайплайн (no-op, имитирует 23 чанка).
    • Критерий: задача с task="noop" исполняется через реестр.
  8. Reaper

    • Фоновая async-задача в приложении: requeue_lost раз в DL_REAPER_PERIOD_SEC.
    • Критерий: задачи с протухшим lease_expires_at возвращаются в queued.
  9. Интеграционные тесты

    • tests/integration_tests/v1_api/test_service.py:

      • trigger → status (queued),
      • воркер подхватил → status (running),
      • done → status (succeeded),
      • cancel во время пайплайна → корректная реакция.
    • Критерий: тесты зелёные в CI.

  10. Dockerfile и запуск

  • Slim образ на Python 3.11/3.12, uvicorn entrypoint.
  • ENV-пример .env, README с запуском.
  • Критерий: контейнер стартует, воркеры работают, API доступно.
  1. Наблюдаемость
  • Логи в формате шаблона (структурные, маскирование).
  • Простая сводка в /status (кол-во активных воркеров, конфиг таймингов).
  • Критерий: видно ключевые переходы статусов и ошибки пайплайнов.