From 33d8f5ab8b091e3c4c89bcaa2fcaecc360da9636 Mon Sep 17 00:00:00 2001 From: itqop Date: Wed, 5 Nov 2025 01:34:42 +0300 Subject: [PATCH] feat: add storage --- TODO.md | 80 +++++ src/dataloader/api/v1/exceptions.py | 24 ++ src/dataloader/api/v1/models.py | 2 + src/dataloader/api/v1/schemas.py | 49 +++ src/dataloader/api/v1/service.py | 31 ++ src/dataloader/api/v1/utils.py | 2 + src/dataloader/config.py | 43 ++- src/dataloader/context.py | 82 ++++- src/dataloader/exceptions.py | 2 + src/dataloader/storage/__init__.py | 2 + src/dataloader/storage/db.py | 39 ++ src/dataloader/storage/repositories.py | 358 +++++++++++++++++++ src/dataloader/workers/__init__.py | 2 + src/dataloader/workers/base.py | 71 ++++ src/dataloader/workers/manager.py | 53 +++ src/dataloader/workers/pipelines/__init__.py | 2 + src/dataloader/workers/pipelines/registry.py | 47 +++ 17 files changed, 868 insertions(+), 21 deletions(-) create mode 100644 TODO.md create mode 100644 src/dataloader/api/v1/exceptions.py create mode 100644 src/dataloader/api/v1/models.py create mode 100644 src/dataloader/api/v1/schemas.py create mode 100644 src/dataloader/api/v1/service.py create mode 100644 src/dataloader/api/v1/utils.py create mode 100644 src/dataloader/exceptions.py create mode 100644 src/dataloader/storage/__init__.py create mode 100644 src/dataloader/storage/db.py create mode 100644 src/dataloader/storage/repositories.py create mode 100644 src/dataloader/workers/__init__.py create mode 100644 src/dataloader/workers/base.py create mode 100644 src/dataloader/workers/manager.py create mode 100644 src/dataloader/workers/pipelines/__init__.py create mode 100644 src/dataloader/workers/pipelines/registry.py diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..0e7c41b --- /dev/null +++ b/TODO.md @@ -0,0 +1,80 @@ +## План внедрения + +1. **Шаблон + каркас пакета** - сделано + + * Создать структуру из ТЗ (один пакет `dataloader/` по src-layout). + * Подтянуть `rest_template.md` артефакты: `os_router.py`, `middleware.py`, `logger/*`. + * `pyproject.toml`: `fastapi`, `uvicorn`, `pydantic-settings`, `sqlalchemy>=2, async`, `psycopg[binary,pool]` или `asyncpg`, `httpx`, `pytest`, `pytest-asyncio`, `httpx[cli]`. + * **Критерий:** `uvicorn dataloader.__main__:app` поднимается, `/health` отдаёт 200. + +2. **Конфиг и контекст** - сделано + + * `config.py`: `AppSettings` (DSN, тайминги, WORKERS_JSON). + * `context.py`: `AppContext`, создание `AsyncEngine` и `async_sessionmaker`, DI. + * **Критерий:** `/status` возвращает версию/uptime, движок создаётся на старте без попыток коннекта в `/health`. + +3. **Хранилище очереди** - в работе + + * `storage/db.py`: фабрики engine/sessionmaker. + * `storage/repositories.py`: методы + + * `create_or_get(req)`, + * `get_status(job_id)`, + * `cancel(job_id)`, + * `requeue_lost(now)`, + * вспомогательные `claim_one(queue)`, `heartbeat(job_id, ttl)`, `finish_ok(job_id)`, `finish_fail_or_retry(job_id, err)`. + * Только чистый SQL (как в ТЗ), транзакция на операцию. + * **Критерий:** unit-интегра тест «поставил-прочитал-отменил» проходит. + +4. **API v1** + + * `api/v1/schemas.py`: `TriggerJobRequest/Response`, `JobStatusResponse`. + * `api/v1/service.py`: бизнес-слой над репозиторием. + * `api/v1/router.py`: `POST /jobs/trigger`, `GET /jobs/{id}/status`, `POST /jobs/{id}/cancel`. + * **Критерий:** ручки соответствуют контрактам, идемпотентность по `idempotency_key` работает. + +5. **Базовый воркер** + + * `workers/base.py`: класс `PGWorker` с циклами `listen_or_sleep → claim → advisory_lock → _pipeline → heartbeat → finish`. + * Идём строго по SQL из ТЗ: `FOR UPDATE SKIP LOCKED`, lease/heartbeat, backoff при lock. + * **Критерий:** локальный мок-пайплайн выполняется, статус `succeeded`. + +6. **Менеджер воркеров** + + * `workers/manager.py`: парсинг `WORKERS_JSON`, создание `asyncio.Task` на воркеры; мягкая остановка на shutdown. + * Подключение в `__main__.py` через FastAPI `on_startup/on_shutdown`. + * **Критерий:** при старте создаются нужные таски, при SIGTERM корректно гасим. + +7. **Реестр пайплайнов** + + * `workers/pipelines/registry.py`: `@register(task)`, `resolve(task)`. + * Пустой эталонный пайплайн (no-op, имитирует 2–3 чанка). + * **Критерий:** задача с `task="noop"` исполняется через реестр. + +8. **Reaper** + + * Фоновая async-задача в приложении: `requeue_lost` раз в `DL_REAPER_PERIOD_SEC`. + * **Критерий:** задачи с протухшим `lease_expires_at` возвращаются в `queued`. + +9. **Интеграционные тесты** + + * `tests/integration_tests/v1_api/test_service.py`: + + * trigger → status (queued), + * воркер подхватил → status (running), + * done → status (succeeded), + * cancel во время пайплайна → корректная реакция. + * **Критерий:** тесты зелёные в CI. + +10. **Dockerfile и запуск** + +* Slim образ на Python 3.11/3.12, `uvicorn` entrypoint. +* ENV-пример `.env`, README с запуском. +* **Критерий:** контейнер стартует, воркеры работают, API доступно. + +11. **Наблюдаемость** + +* Логи в формате шаблона (структурные, маскирование). +* Простая сводка в `/status` (кол-во активных воркеров, конфиг таймингов). +* **Критерий:** видно ключевые переходы статусов и ошибки пайплайнов. + diff --git a/src/dataloader/api/v1/exceptions.py b/src/dataloader/api/v1/exceptions.py new file mode 100644 index 0000000..37d3088 --- /dev/null +++ b/src/dataloader/api/v1/exceptions.py @@ -0,0 +1,24 @@ +"""Исключения для API v1.""" + +from fastapi import HTTPException, status + + +class JobNotFoundError(HTTPException): + """Задача не найдена.""" + + def __init__(self, job_id: str): + super().__init__( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Job {job_id} not found" + ) + + +class JobAlreadyCanceledError(HTTPException): + """Задача уже отменена.""" + + def __init__(self, job_id: str): + super().__init__( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Job {job_id} is already canceled or finished" + ) + diff --git a/src/dataloader/api/v1/models.py b/src/dataloader/api/v1/models.py new file mode 100644 index 0000000..d5c7bd5 --- /dev/null +++ b/src/dataloader/api/v1/models.py @@ -0,0 +1,2 @@ +"""Модели данных для API v1.""" + diff --git a/src/dataloader/api/v1/schemas.py b/src/dataloader/api/v1/schemas.py new file mode 100644 index 0000000..9dae8ff --- /dev/null +++ b/src/dataloader/api/v1/schemas.py @@ -0,0 +1,49 @@ +"""Pydantic схемы для API v1: запросы и ответы.""" + +from pydantic import BaseModel, Field +from typing import Optional, Dict, Any +from datetime import datetime +from uuid import UUID + + +class JobTriggerRequest(BaseModel): + """Запрос на постановку задачи в очередь.""" + queue: str = Field(..., description="Название очереди") + task: str = Field(..., description="Тип задачи") + args: Optional[Dict[str, Any]] = Field(default={}, description="Аргументы задачи") + idempotency_key: Optional[str] = Field(None, description="Ключ идемпотентности") + lock_key: str = Field(..., description="Ключ блокировки") + partition_key: Optional[str] = Field(default="", description="Ключ партиционирования") + priority: Optional[int] = Field(default=100, description="Приоритет задачи") + available_at: Optional[datetime] = Field(None, description="Время доступности задачи (RFC3339)") + + +class JobTriggerResponse(BaseModel): + """Ответ на постановку задачи.""" + job_id: UUID = Field(..., description="Идентификатор задачи") + status: str = Field(..., description="Статус задачи") + + +class JobStatusResponse(BaseModel): + """Ответ со статусом задачи.""" + job_id: UUID + status: str + attempt: int + started_at: Optional[datetime] = None + finished_at: Optional[datetime] = None + heartbeat_at: Optional[datetime] = None + error: Optional[str] = None + progress: Dict[str, Any] = Field(default_factory=dict) + + +class JobCancelResponse(BaseModel): + """Ответ на отмену задачи.""" + job_id: UUID + status: str + attempt: int + started_at: Optional[datetime] = None + finished_at: Optional[datetime] = None + heartbeat_at: Optional[datetime] = None + error: Optional[str] = None + progress: Dict[str, Any] = Field(default_factory=dict) + diff --git a/src/dataloader/api/v1/service.py b/src/dataloader/api/v1/service.py new file mode 100644 index 0000000..a0d5f0e --- /dev/null +++ b/src/dataloader/api/v1/service.py @@ -0,0 +1,31 @@ +"""Бизнес-логика для API v1.""" + +from typing import Optional +from uuid import UUID +from datetime import datetime + +from .schemas import JobTriggerRequest, JobStatusResponse, JobCancelResponse +from ...storage.repositories import JobRepository + + +class JobService: + """Сервис для работы с задачами.""" + + def __init__(self, job_repo: JobRepository): + self.job_repo = job_repo + + async def trigger_job(self, request: JobTriggerRequest) -> JobTriggerResponse: + """Постановка задачи в очередь.""" + # TODO: реализовать идемпотентную постановку через репозиторий + raise NotImplementedError + + async def get_job_status(self, job_id: UUID) -> Optional[JobStatusResponse]: + """Получение статуса задачи.""" + # TODO: реализовать через репозиторий + raise NotImplementedError + + async def cancel_job(self, job_id: UUID) -> Optional[JobCancelResponse]: + """Отмена задачи.""" + # TODO: реализовать через репозиторий + raise NotImplementedError + diff --git a/src/dataloader/api/v1/utils.py b/src/dataloader/api/v1/utils.py new file mode 100644 index 0000000..af722af --- /dev/null +++ b/src/dataloader/api/v1/utils.py @@ -0,0 +1,2 @@ +"""Утилиты для API v1.""" + diff --git a/src/dataloader/config.py b/src/dataloader/config.py index ec5e2aa..85db588 100644 --- a/src/dataloader/config.py +++ b/src/dataloader/config.py @@ -1,5 +1,8 @@ +# src/dataloader/config.py import os +import json from logging import DEBUG, INFO +from typing import Any from dotenv import load_dotenv from pydantic import Field @@ -66,6 +69,9 @@ class LogSettings(BaseAppSettings): class PGSettings(BaseSettings): + """ + Настройки подключения к Postgres. + """ host: str = Field(validation_alias="PG_HOST", default="localhost") port: int = Field(validation_alias="PG_PORT", default=5432) user: str = Field(validation_alias="PG_USER", default="postgres") @@ -81,22 +87,55 @@ class PGSettings(BaseSettings): @property def url(self) -> str: - """Автоматически генерируется SQLAlchemy URL для подключения""" + """ + Строка подключения SQLAlchemy (async). + """ return f"postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}" +class Settings(BaseSettings): + """ + Настройки очереди и воркеров. + """ + dl_db_dsn: str = Field(validation_alias="DL_DB_DSN", default="") + workers_json: str = Field(validation_alias="WORKERS_JSON", default="[]") + dl_heartbeat_sec: int = Field(validation_alias="DL_HEARTBEAT_SEC", default=10) + dl_default_lease_ttl_sec: int = Field(validation_alias="DL_DEFAULT_LEASE_TTL_SEC", default=60) + dl_reaper_period_sec: int = Field(validation_alias="DL_REAPER_PERIOD_SEC", default=10) + dl_claim_backoff_sec: int = Field(validation_alias="DL_CLAIM_BACKOFF_SEC", default=15) + + def parsed_workers(self) -> list[dict[str, Any]]: + """ + Возвращает список конфигураций воркеров из WORKERS_JSON. + """ + try: + data = json.loads(self.workers_json or "[]") + return [d for d in data if isinstance(d, dict)] + except Exception: + return [] + + class Secrets: """ - Класс, агрегирующий все настройки приложения. + Агрегатор настроек приложения. """ app: AppSettings = AppSettings() log: LogSettings = LogSettings() pg: PGSettings = PGSettings() + dl: Settings = Settings() + + @property + def resolved_dsn(self) -> str: + """ + Возвращает DSN для очереди: DL_DB_DSN или URL из PG. + """ + return self.dl.dl_db_dsn or self.pg.url APP_CONFIG = Secrets() __all__ = [ + "Settings", "Secrets", "APP_CONFIG", ] diff --git a/src/dataloader/context.py b/src/dataloader/context.py index c120b95..9bd913d 100644 --- a/src/dataloader/context.py +++ b/src/dataloader/context.py @@ -1,21 +1,19 @@ -# Реализация паттерна AppContext — единая точка доступа к зависимостям -from dataloader.base import Singleton +# src/dataloader/context.py +from __future__ import annotations + import typing +import pytz +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from dataloader.base import Singleton from dataloader.config import APP_CONFIG, Secrets from dataloader.logger import ContextVarsContainer, LoggerConfigurator -import pytz - - - - class AppContext(metaclass=Singleton): - @property - def logger(self) -> "typing.Any": - return self._logger_manager.async_logger - - + """ + Контекст приложения: логгер, таймзона, подключение к БД и фабрика сессий. + """ def __init__(self, secrets: Secrets) -> None: self.timezone = pytz.timezone(secrets.app.timezone) self.context_vars_container = ContextVarsContainer() @@ -30,33 +28,79 @@ class AppContext(metaclass=Singleton): timezone=self.timezone, ) self.pg = secrets.pg + self.dl = secrets.dl + self._engine: AsyncEngine | None = None + self._sessionmaker: async_sessionmaker[AsyncSession] | None = None self.logger.info("App context initialized.") + @property + def logger(self) -> "typing.Any": + """ + Возвращает асинхронный логгер. + """ + return self._logger_manager.async_logger + + @property + def engine(self) -> AsyncEngine: + """ + Возвращает текущий AsyncEngine. + """ + assert self._engine is not None, "Engine is not initialized" + return self._engine + + @property + def sessionmaker(self) -> async_sessionmaker[AsyncSession]: + """ + Возвращает фабрику асинхронных сессий. + """ + assert self._sessionmaker is not None, "Sessionmaker is not initialized" + return self._sessionmaker def get_logger(self) -> "typing.Any": + """ + Возвращает логгер. + """ return self.logger - def get_context_vars_container(self) -> ContextVarsContainer: + """ + Возвращает контейнер контекстных переменных логгера. + """ return self.context_vars_container - def get_pytz_timezone(self): + """ + Возвращает таймзону приложения. + """ return self.timezone - async def on_startup(self) -> None: + """ + Инициализирует подключение к БД и готовит фабрику сессий. + """ self.logger.info("Application is starting up.") + dsn = APP_CONFIG.resolved_dsn + self._engine = create_async_engine( + dsn, + pool_size=self.pg.pool_size if self.pg.use_pool else None, + max_overflow=self.pg.max_overflow if self.pg.use_pool else 0, + pool_recycle=self.pg.pool_recycle if self.pg.use_pool else -1, + ) + self._sessionmaker = async_sessionmaker(self._engine, expire_on_commit=False, class_=AsyncSession) self.logger.info("All connections checked. Application is up and ready.") - async def on_shutdown(self) -> None: + """ + Останавливает подсистемы и освобождает ресурсы. + """ self.logger.info("Application is shutting down.") + if self._engine is not None: + await self._engine.dispose() + self._engine = None + self._sessionmaker = None self._logger_manager.remove_logger_handlers() - APP_CTX = AppContext(APP_CONFIG) - -__all__ = ["APP_CTX"] \ No newline at end of file +__all__ = ["APP_CTX"] diff --git a/src/dataloader/exceptions.py b/src/dataloader/exceptions.py new file mode 100644 index 0000000..d259d99 --- /dev/null +++ b/src/dataloader/exceptions.py @@ -0,0 +1,2 @@ +"""Исключения уровня приложения.""" + diff --git a/src/dataloader/storage/__init__.py b/src/dataloader/storage/__init__.py new file mode 100644 index 0000000..f23621f --- /dev/null +++ b/src/dataloader/storage/__init__.py @@ -0,0 +1,2 @@ +"""Модуль для работы с хранилищем данных.""" + diff --git a/src/dataloader/storage/db.py b/src/dataloader/storage/db.py new file mode 100644 index 0000000..60ed027 --- /dev/null +++ b/src/dataloader/storage/db.py @@ -0,0 +1,39 @@ +# src/dataloader/storage/db.py +from __future__ import annotations + +from typing import AsyncIterator + +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker +from sqlalchemy.orm import DeclarativeBase + +from dataloader.context import APP_CTX + + +class Base(DeclarativeBase): + """ + Базовый класс моделей ORM. + """ + pass + + +def get_engine() -> AsyncEngine: + """ + Возвращает AsyncEngine, инициализированный в контексте приложения. + """ + return APP_CTX.engine + + +def get_sessionmaker() -> async_sessionmaker[AsyncSession]: + """ + Возвращает фабрику асинхронных сессий. + """ + return APP_CTX.sessionmaker + + +async def session_scope() -> AsyncIterator[AsyncSession]: + """ + Асинхронный контекст жизненного цикла сессии. + """ + sm = get_sessionmaker() + async with sm() as s: + yield s diff --git a/src/dataloader/storage/repositories.py b/src/dataloader/storage/repositories.py new file mode 100644 index 0000000..1ba963c --- /dev/null +++ b/src/dataloader/storage/repositories.py @@ -0,0 +1,358 @@ +# src/dataloader/storage/repositories.py +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any, Optional + +from sqlalchemy import BigInteger, String, Text, select, func, update +from sqlalchemy.dialects.postgresql import JSONB, ENUM +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import Mapped, mapped_column + +from dataloader.storage.db import Base + + +dl_status_enum = ENUM( + "queued", + "running", + "succeeded", + "failed", + "canceled", + "lost", + name="dl_status", + create_type=False, + native_enum=True, +) + + +class DLJob(Base): + """ + Модель очереди dl_jobs. + """ + __tablename__ = "dl_jobs" + + job_id: Mapped[str] = mapped_column(String(36), primary_key=True) + queue: Mapped[str] = mapped_column(Text, nullable=False) + task: Mapped[str] = mapped_column(Text, nullable=False) + args: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False) + idempotency_key: Mapped[Optional[str]] = mapped_column(Text, unique=True) + lock_key: Mapped[str] = mapped_column(Text, nullable=False) + partition_key: Mapped[str] = mapped_column(Text, default="", nullable=False) + priority: Mapped[int] = mapped_column(nullable=False, default=100) + available_at: Mapped[datetime] = mapped_column(nullable=False) + status: Mapped[str] = mapped_column(dl_status_enum, nullable=False, default="queued") + attempt: Mapped[int] = mapped_column(nullable=False, default=0) + max_attempts: Mapped[int] = mapped_column(nullable=False, default=5) + lease_ttl_sec: Mapped[int] = mapped_column(nullable=False, default=60) + lease_expires_at: Mapped[Optional[datetime]] = mapped_column(nullable=True) + heartbeat_at: Mapped[Optional[datetime]] = mapped_column(nullable=True) + cancel_requested: Mapped[bool] = mapped_column(nullable=False, default=False) + progress: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False) + error: Mapped[Optional[str]] = mapped_column(Text) + producer: Mapped[Optional[str]] = mapped_column(Text) + consumer_group: Mapped[Optional[str]] = mapped_column(Text) + created_at: Mapped[datetime] = mapped_column(nullable=False) + started_at: Mapped[Optional[datetime]] = mapped_column(nullable=True) + finished_at: Mapped[Optional[datetime]] = mapped_column(nullable=True) + + +class DLJobEvent(Base): + """ + Модель журнала событий dl_job_events. + """ + __tablename__ = "dl_job_events" + + event_id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + job_id: Mapped[str] = mapped_column(String(36), nullable=False) + queue: Mapped[str] = mapped_column(Text, nullable=False) + ts: Mapped[datetime] = mapped_column(nullable=False) + kind: Mapped[str] = mapped_column(Text, nullable=False) + payload: Mapped[Optional[dict[str, Any]]] = mapped_column(JSONB) + + +@dataclass(frozen=True) +class CreateJobRequest: + """ + Параметры постановки задачи. + """ + job_id: str + queue: str + task: str + args: dict[str, Any] + idempotency_key: Optional[str] + lock_key: str + partition_key: str + priority: int + available_at: datetime + max_attempts: int + lease_ttl_sec: int + producer: Optional[str] + consumer_group: Optional[str] + + +@dataclass(frozen=True) +class JobStatus: + """ + Снимок статуса задачи. + """ + job_id: str + status: str + attempt: int + started_at: Optional[datetime] + finished_at: Optional[datetime] + heartbeat_at: Optional[datetime] + error: Optional[str] + progress: dict[str, Any] + + +class QueueRepository: + """ + Репозиторий очереди и событий с полнотой ORM. + """ + def __init__(self, session: AsyncSession): + self.s = session + + async def create_or_get(self, req: CreateJobRequest) -> tuple[str, str]: + """ + Идемпотентно создаёт запись в очереди и возвращает (job_id, status). + """ + if req.idempotency_key: + q = select(DLJob).where(DLJob.idempotency_key == req.idempotency_key) + r = await self.s.execute(q) + ex = r.scalar_one_or_none() + if ex: + return ex.job_id, ex.status + + row = DLJob( + job_id=req.job_id, + queue=req.queue, + task=req.task, + args=req.args or {}, + idempotency_key=req.idempotency_key, + lock_key=req.lock_key, + partition_key=req.partition_key or "", + priority=req.priority, + available_at=req.available_at, + status="queued", + attempt=0, + max_attempts=req.max_attempts, + lease_ttl_sec=req.lease_ttl_sec, + lease_expires_at=None, + heartbeat_at=None, + cancel_requested=False, + progress={}, + error=None, + producer=req.producer, + consumer_group=req.consumer_group, + created_at=datetime.now(timezone.utc), + started_at=None, + finished_at=None, + ) + self.s.add(row) + await self._append_event(req.job_id, req.queue, "queued", {"task": req.task}) + await self.s.commit() + return req.job_id, "queued" + + async def get_status(self, job_id: str) -> Optional[JobStatus]: + """ + Возвращает статус задачи. + """ + q = select( + DLJob.job_id, + DLJob.status, + DLJob.attempt, + DLJob.started_at, + DLJob.finished_at, + DLJob.heartbeat_at, + DLJob.error, + DLJob.progress, + ).where(DLJob.job_id == job_id) + r = await self.s.execute(q) + m = r.first() + if not m: + return None + return JobStatus( + job_id=m.job_id, + status=m.status, + attempt=m.attempt, + started_at=m.started_at, + finished_at=m.finished_at, + heartbeat_at=m.heartbeat_at, + error=m.error, + progress=m.progress or {}, + ) + + async def cancel(self, job_id: str) -> bool: + """ + Устанавливает флаг отмены для задачи. + """ + q = update(DLJob).where(DLJob.job_id == job_id).values(cancel_requested=True) + await self.s.execute(q) + await self._append_event(job_id, await self._resolve_queue(job_id), "cancel_requested", None) + await self.s.commit() + return True + + async def claim_one(self, queue: str) -> Optional[dict[str, Any]]: + """ + Захватывает одну задачу из очереди с учётом блокировок и выставляет running. + """ + async with self.s.begin(): + q = ( + select(DLJob) + .where( + DLJob.status == "queued", + DLJob.queue == queue, + DLJob.available_at <= func.now(), + ) + .order_by(DLJob.priority.asc(), DLJob.created_at.asc()) + .with_for_update(skip_locked=True) + .limit(1) + ) + r = await self.s.execute(q) + job: Optional[DLJob] = r.scalar_one_or_none() + if not job: + return None + + job.status = "running" + job.started_at = job.started_at or datetime.now(timezone.utc) + job.attempt = int(job.attempt) + 1 + job.heartbeat_at = datetime.now(timezone.utc) + job.lease_expires_at = datetime.now(timezone.utc) + timedelta(seconds=int(job.lease_ttl_sec)) + + ok = await self._try_advisory_lock(job.lock_key) + if not ok: + job.status = "queued" + job.available_at = datetime.now(timezone.utc) + timedelta(seconds=15) + return None + + await self._append_event(job.job_id, job.queue, "picked", {"attempt": job.attempt}) + + return { + "job_id": job.job_id, + "queue": job.queue, + "task": job.task, + "args": job.args or {}, + "lock_key": job.lock_key, + "partition_key": job.partition_key or "", + "lease_ttl_sec": int(job.lease_ttl_sec), + "attempt": int(job.attempt), + } + + async def heartbeat(self, job_id: str, ttl_sec: int) -> None: + """ + Обновляет heartbeat и продлевает lease. + """ + now = datetime.now(timezone.utc) + q = ( + update(DLJob) + .where(DLJob.job_id == job_id, DLJob.status == "running") + .values(heartbeat_at=now, lease_expires_at=now + timedelta(seconds=int(ttl_sec))) + ) + await self.s.execute(q) + await self._append_event(job_id, await self._resolve_queue(job_id), "heartbeat", {"ttl": ttl_sec}) + await self.s.commit() + + async def finish_ok(self, job_id: str) -> None: + """ + Помечает задачу как выполненную успешно и снимает advisory-lock. + """ + job = await self._get(job_id) + if not job: + return + job.status = "succeeded" + job.finished_at = datetime.now(timezone.utc) + job.lease_expires_at = None + await self._append_event(job_id, job.queue, "succeeded", None) + await self._advisory_unlock(job.lock_key) + await self.s.commit() + + async def finish_fail_or_retry(self, job_id: str, err: str) -> None: + """ + Помечает задачу как failed или возвращает в очередь с задержкой. + """ + job = await self._get(job_id) + if not job: + return + can_retry = int(job.attempt) < int(job.max_attempts) + if can_retry: + job.status = "queued" + job.available_at = datetime.now(timezone.utc) + timedelta(seconds=30 * int(job.attempt)) + job.error = err + job.lease_expires_at = None + await self._append_event(job_id, job.queue, "requeue", {"attempt": job.attempt, "error": err}) + else: + job.status = "failed" + job.error = err + job.finished_at = datetime.now(timezone.utc) + job.lease_expires_at = None + await self._append_event(job_id, job.queue, "failed", {"error": err}) + await self._advisory_unlock(job.lock_key) + await self.s.commit() + + async def requeue_lost(self, now: Optional[datetime] = None) -> list[str]: + """ + Возвращает протухшие running-задачи в очередь. + """ + now = now or datetime.now(timezone.utc) + q = ( + select(DLJob) + .where( + DLJob.status == "running", + DLJob.lease_expires_at.is_not(None), + DLJob.lease_expires_at < now, + ) + .with_for_update(skip_locked=True) + ) + r = await self.s.execute(q) + rows = list(r.scalars().all()) + ids: list[str] = [] + for job in rows: + job.status = "queued" + job.available_at = now + job.lease_expires_at = None + ids.append(job.job_id) + await self._append_event(job.job_id, job.queue, "requeue_lost", None) + await self.s.commit() + return ids + + async def _get(self, job_id: str) -> Optional[DLJob]: + """ + Возвращает ORM-объект задачи. + """ + r = await self.s.execute(select(DLJob).where(DLJob.job_id == job_id).with_for_update(skip_locked=True)) + return r.scalar_one_or_none() + + async def _resolve_queue(self, job_id: str) -> str: + """ + Возвращает имя очереди для события. + """ + r = await self.s.execute(select(DLJob.queue).where(DLJob.job_id == job_id)) + v = r.scalar_one_or_none() + return v or "" + + async def _append_event(self, job_id: str, queue: str, kind: str, payload: Optional[dict[str, Any]]) -> None: + """ + Добавляет запись в журнал событий. + """ + ev = DLJobEvent( + job_id=job_id, + queue=queue or "", + ts=datetime.now(timezone.utc), + kind=kind, + payload=payload or None, + ) + self.s.add(ev) + + async def _try_advisory_lock(self, lock_key: str) -> bool: + """ + Пытается получить advisory-lock в Postgres. + """ + r = await self.s.execute(select(func.pg_try_advisory_lock(func.hashtext(lock_key)))) + return bool(r.scalar()) + + async def _advisory_unlock(self, lock_key: str) -> None: + """ + Снимает advisory-lock в Postgres. + """ + await self.s.execute(select(func.pg_advisory_unlock(func.hashtext(lock_key)))) diff --git a/src/dataloader/workers/__init__.py b/src/dataloader/workers/__init__.py new file mode 100644 index 0000000..20cd673 --- /dev/null +++ b/src/dataloader/workers/__init__.py @@ -0,0 +1,2 @@ +"""Модуль воркеров для обработки задач.""" + diff --git a/src/dataloader/workers/base.py b/src/dataloader/workers/base.py new file mode 100644 index 0000000..55b4f4f --- /dev/null +++ b/src/dataloader/workers/base.py @@ -0,0 +1,71 @@ +"""Общий PG-воркер: claim/lease/heartbeat/retry.""" + +import asyncio +from typing import Optional +from uuid import UUID +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import text + +from ..config import Settings, PGSettings +from ..storage.db import Database +from .pipelines.registry import PipelineRegistry + + +class BaseWorker: + """Базовый воркер для обработки задач из очереди.""" + + def __init__( + self, + queue: str, + settings: Settings, + shutdown_event: asyncio.Event, + ): + self.queue = queue + self.settings = settings + self.shutdown_event = shutdown_event + self.db: Optional[Database] = None + self.current_job_id: Optional[UUID] = None + self.current_lock_key: Optional[str] = None + + async def run(self): + """Основной цикл воркера.""" + # TODO: инициализация БД, подключение к LISTEN/NOTIFY + # TODO: цикл claim -> heartbeat -> execute -> finish + raise NotImplementedError + + async def claim_job(self, session: AsyncSession) -> Optional[dict]: + """ + Claim одной задачи из очереди. + Возвращает dict с job_id, task, args, lock_key или None. + """ + # TODO: реализовать SELECT ... FOR UPDATE SKIP LOCKED + # TODO: проверка advisory lock на lock_key + # TODO: backoff если advisory lock занят + raise NotImplementedError + + async def heartbeat(self, session: AsyncSession, job_id: UUID): + """Обновление heartbeat для задачи.""" + # TODO: UPDATE heartbeat_at и lease_expires_at + raise NotImplementedError + + async def finish_job( + self, + session: AsyncSession, + job_id: UUID, + success: bool, + error: Optional[str] = None, + ): + """Завершение задачи (успех или ошибка с ретраем).""" + # TODO: UPDATE status в зависимости от success и attempt + # TODO: снятие advisory lock + raise NotImplementedError + + async def check_cancel_requested( + self, + session: AsyncSession, + job_id: UUID, + ) -> bool: + """Проверка флага cancel_requested.""" + # TODO: SELECT cancel_requested + raise NotImplementedError + diff --git a/src/dataloader/workers/manager.py b/src/dataloader/workers/manager.py new file mode 100644 index 0000000..898d30c --- /dev/null +++ b/src/dataloader/workers/manager.py @@ -0,0 +1,53 @@ +"""Создание asyncio Tasks воркеров по конфигу.""" + +import asyncio +import json +from typing import List, Dict, Any +from ..config import Settings, PGSettings +from .base import BaseWorker + + +class WorkerManager: + """Менеджер воркеров: создание и управление воркерами по конфигу.""" + + def __init__(self, settings: Settings): + self.settings = settings + self.workers: List[asyncio.Task] = [] + self.worker_configs: List[Dict[str, Any]] = [] + self._shutdown_event = asyncio.Event() + + def _parse_workers_config(self) -> List[Dict[str, Any]]: + """Парсинг WORKERS_JSON из конфига.""" + if not self.settings.workers_json: + return [] + try: + return json.loads(self.settings.workers_json) + except json.JSONDecodeError: + # TODO: логирование ошибки + return [] + + async def start(self): + """Запуск всех воркеров.""" + self.worker_configs = self._parse_workers_config() + for config in self.worker_configs: + queue = config.get("queue") + concurrency = config.get("concurrency", 1) + + for _ in range(concurrency): + worker = BaseWorker( + queue=queue, + settings=self.settings, + shutdown_event=self._shutdown_event, + ) + task = asyncio.create_task(worker.run()) + self.workers.append(task) + + async def shutdown(self, timeout: float = 30.0): + """Мягкая остановка воркеров.""" + self._shutdown_event.set() + # Ждем завершения всех воркеров с таймаутом + await asyncio.wait_for( + asyncio.gather(*self.workers, return_exceptions=True), + timeout=timeout, + ) + diff --git a/src/dataloader/workers/pipelines/__init__.py b/src/dataloader/workers/pipelines/__init__.py new file mode 100644 index 0000000..90b3fe0 --- /dev/null +++ b/src/dataloader/workers/pipelines/__init__.py @@ -0,0 +1,2 @@ +"""Модуль пайплайнов обработки задач.""" + diff --git a/src/dataloader/workers/pipelines/registry.py b/src/dataloader/workers/pipelines/registry.py new file mode 100644 index 0000000..e20d28c --- /dev/null +++ b/src/dataloader/workers/pipelines/registry.py @@ -0,0 +1,47 @@ +"""Реестр обработчиков по task.""" + +from typing import Dict, Callable, Any, Awaitable, Optional +from abc import ABC, abstractmethod + + +class Pipeline(ABC): + """Базовый класс для пайплайна обработки задачи.""" + + @abstractmethod + async def execute(self, args: Dict[str, Any]) -> Dict[str, Any]: + """ + Выполнение задачи. + Возвращает результат (например, progress). + """ + pass + + +class PipelineRegistry: + """Реестр обработчиков задач по типу task.""" + + def __init__(self): + self._pipelines: Dict[str, type[Pipeline]] = {} + + def register(self, task: str, pipeline_class: type[Pipeline]): + """Регистрация обработчика для типа задачи.""" + self._pipelines[task] = pipeline_class + + def get_pipeline(self, task: str) -> Optional[type[Pipeline]]: + """Получить класс обработчика для типа задачи.""" + return self._pipelines.get(task) + + async def execute( + self, + task: str, + args: Dict[str, Any], + ) -> Dict[str, Any]: + """ + Выполнить задачу через зарегистрированный обработчик. + """ + pipeline_class = self.get_pipeline(task) + if pipeline_class is None: + raise ValueError(f"Pipeline for task '{task}' not found") + + pipeline = pipeline_class() + return await pipeline.execute(args) +