feat: add storage

This commit is contained in:
itqop 2025-11-05 01:34:42 +03:00
parent d0e5897984
commit 33d8f5ab8b
17 changed files with 868 additions and 21 deletions

80
TODO.md Normal file
View File

@ -0,0 +1,80 @@
## План внедрения
1. **Шаблон + каркас пакета** - сделано
* Создать структуру из ТЗ (один пакет `dataloader/` по src-layout).
* Подтянуть `rest_template.md` артефакты: `os_router.py`, `middleware.py`, `logger/*`.
* `pyproject.toml`: `fastapi`, `uvicorn`, `pydantic-settings`, `sqlalchemy>=2, async`, `psycopg[binary,pool]` или `asyncpg`, `httpx`, `pytest`, `pytest-asyncio`, `httpx[cli]`.
* **Критерий:** `uvicorn dataloader.__main__:app` поднимается, `/health` отдаёт 200.
2. **Конфиг и контекст** - сделано
* `config.py`: `AppSettings` (DSN, тайминги, WORKERS_JSON).
* `context.py`: `AppContext`, создание `AsyncEngine` и `async_sessionmaker`, DI.
* **Критерий:** `/status` возвращает версию/uptime, движок создаётся на старте без попыток коннекта в `/health`.
3. **Хранилище очереди** - в работе
* `storage/db.py`: фабрики engine/sessionmaker.
* `storage/repositories.py`: методы
* `create_or_get(req)`,
* `get_status(job_id)`,
* `cancel(job_id)`,
* `requeue_lost(now)`,
* вспомогательные `claim_one(queue)`, `heartbeat(job_id, ttl)`, `finish_ok(job_id)`, `finish_fail_or_retry(job_id, err)`.
* Только чистый SQL (как в ТЗ), транзакция на операцию.
* **Критерий:** unit-интегра тест «поставил-прочитал-отменил» проходит.
4. **API v1**
* `api/v1/schemas.py`: `TriggerJobRequest/Response`, `JobStatusResponse`.
* `api/v1/service.py`: бизнес-слой над репозиторием.
* `api/v1/router.py`: `POST /jobs/trigger`, `GET /jobs/{id}/status`, `POST /jobs/{id}/cancel`.
* **Критерий:** ручки соответствуют контрактам, идемпотентность по `idempotency_key` работает.
5. **Базовый воркер**
* `workers/base.py`: класс `PGWorker` с циклами `listen_or_sleep → claim → advisory_lock → _pipeline → heartbeat → finish`.
* Идём строго по SQL из ТЗ: `FOR UPDATE SKIP LOCKED`, lease/heartbeat, backoff при lock.
* **Критерий:** локальный мок-пайплайн выполняется, статус `succeeded`.
6. **Менеджер воркеров**
* `workers/manager.py`: парсинг `WORKERS_JSON`, создание `asyncio.Task` на воркеры; мягкая остановка на shutdown.
* Подключение в `__main__.py` через FastAPI `on_startup/on_shutdown`.
* **Критерий:** при старте создаются нужные таски, при SIGTERM корректно гасим.
7. **Реестр пайплайнов**
* `workers/pipelines/registry.py`: `@register(task)`, `resolve(task)`.
* Пустой эталонный пайплайн (no-op, имитирует 23 чанка).
* **Критерий:** задача с `task="noop"` исполняется через реестр.
8. **Reaper**
* Фоновая async-задача в приложении: `requeue_lost` раз в `DL_REAPER_PERIOD_SEC`.
* **Критерий:** задачи с протухшим `lease_expires_at` возвращаются в `queued`.
9. **Интеграционные тесты**
* `tests/integration_tests/v1_api/test_service.py`:
* trigger → status (queued),
* воркер подхватил → status (running),
* done → status (succeeded),
* cancel во время пайплайна → корректная реакция.
* **Критерий:** тесты зелёные в CI.
10. **Dockerfile и запуск**
* Slim образ на Python 3.11/3.12, `uvicorn` entrypoint.
* ENV-пример `.env`, README с запуском.
* **Критерий:** контейнер стартует, воркеры работают, API доступно.
11. **Наблюдаемость**
* Логи в формате шаблона (структурные, маскирование).
* Простая сводка в `/status` (кол-во активных воркеров, конфиг таймингов).
* **Критерий:** видно ключевые переходы статусов и ошибки пайплайнов.

View File

@ -0,0 +1,24 @@
"""Исключения для API v1."""
from fastapi import HTTPException, status
class JobNotFoundError(HTTPException):
"""Задача не найдена."""
def __init__(self, job_id: str):
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Job {job_id} not found"
)
class JobAlreadyCanceledError(HTTPException):
"""Задача уже отменена."""
def __init__(self, job_id: str):
super().__init__(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Job {job_id} is already canceled or finished"
)

View File

@ -0,0 +1,2 @@
"""Модели данных для API v1."""

View File

@ -0,0 +1,49 @@
"""Pydantic схемы для API v1: запросы и ответы."""
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from datetime import datetime
from uuid import UUID
class JobTriggerRequest(BaseModel):
"""Запрос на постановку задачи в очередь."""
queue: str = Field(..., description="Название очереди")
task: str = Field(..., description="Тип задачи")
args: Optional[Dict[str, Any]] = Field(default={}, description="Аргументы задачи")
idempotency_key: Optional[str] = Field(None, description="Ключ идемпотентности")
lock_key: str = Field(..., description="Ключ блокировки")
partition_key: Optional[str] = Field(default="", description="Ключ партиционирования")
priority: Optional[int] = Field(default=100, description="Приоритет задачи")
available_at: Optional[datetime] = Field(None, description="Время доступности задачи (RFC3339)")
class JobTriggerResponse(BaseModel):
"""Ответ на постановку задачи."""
job_id: UUID = Field(..., description="Идентификатор задачи")
status: str = Field(..., description="Статус задачи")
class JobStatusResponse(BaseModel):
"""Ответ со статусом задачи."""
job_id: UUID
status: str
attempt: int
started_at: Optional[datetime] = None
finished_at: Optional[datetime] = None
heartbeat_at: Optional[datetime] = None
error: Optional[str] = None
progress: Dict[str, Any] = Field(default_factory=dict)
class JobCancelResponse(BaseModel):
"""Ответ на отмену задачи."""
job_id: UUID
status: str
attempt: int
started_at: Optional[datetime] = None
finished_at: Optional[datetime] = None
heartbeat_at: Optional[datetime] = None
error: Optional[str] = None
progress: Dict[str, Any] = Field(default_factory=dict)

View File

@ -0,0 +1,31 @@
"""Бизнес-логика для API v1."""
from typing import Optional
from uuid import UUID
from datetime import datetime
from .schemas import JobTriggerRequest, JobStatusResponse, JobCancelResponse
from ...storage.repositories import JobRepository
class JobService:
"""Сервис для работы с задачами."""
def __init__(self, job_repo: JobRepository):
self.job_repo = job_repo
async def trigger_job(self, request: JobTriggerRequest) -> JobTriggerResponse:
"""Постановка задачи в очередь."""
# TODO: реализовать идемпотентную постановку через репозиторий
raise NotImplementedError
async def get_job_status(self, job_id: UUID) -> Optional[JobStatusResponse]:
"""Получение статуса задачи."""
# TODO: реализовать через репозиторий
raise NotImplementedError
async def cancel_job(self, job_id: UUID) -> Optional[JobCancelResponse]:
"""Отмена задачи."""
# TODO: реализовать через репозиторий
raise NotImplementedError

View File

@ -0,0 +1,2 @@
"""Утилиты для API v1."""

View File

@ -1,5 +1,8 @@
# src/dataloader/config.py
import os
import json
from logging import DEBUG, INFO
from typing import Any
from dotenv import load_dotenv
from pydantic import Field
@ -66,6 +69,9 @@ class LogSettings(BaseAppSettings):
class PGSettings(BaseSettings):
"""
Настройки подключения к Postgres.
"""
host: str = Field(validation_alias="PG_HOST", default="localhost")
port: int = Field(validation_alias="PG_PORT", default=5432)
user: str = Field(validation_alias="PG_USER", default="postgres")
@ -81,22 +87,55 @@ class PGSettings(BaseSettings):
@property
def url(self) -> str:
"""Автоматически генерируется SQLAlchemy URL для подключения"""
"""
Строка подключения SQLAlchemy (async).
"""
return f"postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"
class Settings(BaseSettings):
"""
Настройки очереди и воркеров.
"""
dl_db_dsn: str = Field(validation_alias="DL_DB_DSN", default="")
workers_json: str = Field(validation_alias="WORKERS_JSON", default="[]")
dl_heartbeat_sec: int = Field(validation_alias="DL_HEARTBEAT_SEC", default=10)
dl_default_lease_ttl_sec: int = Field(validation_alias="DL_DEFAULT_LEASE_TTL_SEC", default=60)
dl_reaper_period_sec: int = Field(validation_alias="DL_REAPER_PERIOD_SEC", default=10)
dl_claim_backoff_sec: int = Field(validation_alias="DL_CLAIM_BACKOFF_SEC", default=15)
def parsed_workers(self) -> list[dict[str, Any]]:
"""
Возвращает список конфигураций воркеров из WORKERS_JSON.
"""
try:
data = json.loads(self.workers_json or "[]")
return [d for d in data if isinstance(d, dict)]
except Exception:
return []
class Secrets:
"""
Класс, агрегирующий все настройки приложения.
Агрегатор настроек приложения.
"""
app: AppSettings = AppSettings()
log: LogSettings = LogSettings()
pg: PGSettings = PGSettings()
dl: Settings = Settings()
@property
def resolved_dsn(self) -> str:
"""
Возвращает DSN для очереди: DL_DB_DSN или URL из PG.
"""
return self.dl.dl_db_dsn or self.pg.url
APP_CONFIG = Secrets()
__all__ = [
"Settings",
"Secrets",
"APP_CONFIG",
]

View File

@ -1,21 +1,19 @@
# Реализация паттерна AppContext — единая точка доступа к зависимостям
from dataloader.base import Singleton
# src/dataloader/context.py
from __future__ import annotations
import typing
import pytz
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
from dataloader.base import Singleton
from dataloader.config import APP_CONFIG, Secrets
from dataloader.logger import ContextVarsContainer, LoggerConfigurator
import pytz
class AppContext(metaclass=Singleton):
@property
def logger(self) -> "typing.Any":
return self._logger_manager.async_logger
"""
Контекст приложения: логгер, таймзона, подключение к БД и фабрика сессий.
"""
def __init__(self, secrets: Secrets) -> None:
self.timezone = pytz.timezone(secrets.app.timezone)
self.context_vars_container = ContextVarsContainer()
@ -30,33 +28,79 @@ class AppContext(metaclass=Singleton):
timezone=self.timezone,
)
self.pg = secrets.pg
self.dl = secrets.dl
self._engine: AsyncEngine | None = None
self._sessionmaker: async_sessionmaker[AsyncSession] | None = None
self.logger.info("App context initialized.")
@property
def logger(self) -> "typing.Any":
"""
Возвращает асинхронный логгер.
"""
return self._logger_manager.async_logger
@property
def engine(self) -> AsyncEngine:
"""
Возвращает текущий AsyncEngine.
"""
assert self._engine is not None, "Engine is not initialized"
return self._engine
@property
def sessionmaker(self) -> async_sessionmaker[AsyncSession]:
"""
Возвращает фабрику асинхронных сессий.
"""
assert self._sessionmaker is not None, "Sessionmaker is not initialized"
return self._sessionmaker
def get_logger(self) -> "typing.Any":
"""
Возвращает логгер.
"""
return self.logger
def get_context_vars_container(self) -> ContextVarsContainer:
"""
Возвращает контейнер контекстных переменных логгера.
"""
return self.context_vars_container
def get_pytz_timezone(self):
"""
Возвращает таймзону приложения.
"""
return self.timezone
async def on_startup(self) -> None:
"""
Инициализирует подключение к БД и готовит фабрику сессий.
"""
self.logger.info("Application is starting up.")
dsn = APP_CONFIG.resolved_dsn
self._engine = create_async_engine(
dsn,
pool_size=self.pg.pool_size if self.pg.use_pool else None,
max_overflow=self.pg.max_overflow if self.pg.use_pool else 0,
pool_recycle=self.pg.pool_recycle if self.pg.use_pool else -1,
)
self._sessionmaker = async_sessionmaker(self._engine, expire_on_commit=False, class_=AsyncSession)
self.logger.info("All connections checked. Application is up and ready.")
async def on_shutdown(self) -> None:
"""
Останавливает подсистемы и освобождает ресурсы.
"""
self.logger.info("Application is shutting down.")
if self._engine is not None:
await self._engine.dispose()
self._engine = None
self._sessionmaker = None
self._logger_manager.remove_logger_handlers()
APP_CTX = AppContext(APP_CONFIG)
__all__ = ["APP_CTX"]

View File

@ -0,0 +1,2 @@
"""Исключения уровня приложения."""

View File

@ -0,0 +1,2 @@
"""Модуль для работы с хранилищем данных."""

View File

@ -0,0 +1,39 @@
# src/dataloader/storage/db.py
from __future__ import annotations
from typing import AsyncIterator
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from dataloader.context import APP_CTX
class Base(DeclarativeBase):
"""
Базовый класс моделей ORM.
"""
pass
def get_engine() -> AsyncEngine:
"""
Возвращает AsyncEngine, инициализированный в контексте приложения.
"""
return APP_CTX.engine
def get_sessionmaker() -> async_sessionmaker[AsyncSession]:
"""
Возвращает фабрику асинхронных сессий.
"""
return APP_CTX.sessionmaker
async def session_scope() -> AsyncIterator[AsyncSession]:
"""
Асинхронный контекст жизненного цикла сессии.
"""
sm = get_sessionmaker()
async with sm() as s:
yield s

View File

@ -0,0 +1,358 @@
# src/dataloader/storage/repositories.py
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
from sqlalchemy import BigInteger, String, Text, select, func, update
from sqlalchemy.dialects.postgresql import JSONB, ENUM
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from dataloader.storage.db import Base
dl_status_enum = ENUM(
"queued",
"running",
"succeeded",
"failed",
"canceled",
"lost",
name="dl_status",
create_type=False,
native_enum=True,
)
class DLJob(Base):
"""
Модель очереди dl_jobs.
"""
__tablename__ = "dl_jobs"
job_id: Mapped[str] = mapped_column(String(36), primary_key=True)
queue: Mapped[str] = mapped_column(Text, nullable=False)
task: Mapped[str] = mapped_column(Text, nullable=False)
args: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False)
idempotency_key: Mapped[Optional[str]] = mapped_column(Text, unique=True)
lock_key: Mapped[str] = mapped_column(Text, nullable=False)
partition_key: Mapped[str] = mapped_column(Text, default="", nullable=False)
priority: Mapped[int] = mapped_column(nullable=False, default=100)
available_at: Mapped[datetime] = mapped_column(nullable=False)
status: Mapped[str] = mapped_column(dl_status_enum, nullable=False, default="queued")
attempt: Mapped[int] = mapped_column(nullable=False, default=0)
max_attempts: Mapped[int] = mapped_column(nullable=False, default=5)
lease_ttl_sec: Mapped[int] = mapped_column(nullable=False, default=60)
lease_expires_at: Mapped[Optional[datetime]] = mapped_column(nullable=True)
heartbeat_at: Mapped[Optional[datetime]] = mapped_column(nullable=True)
cancel_requested: Mapped[bool] = mapped_column(nullable=False, default=False)
progress: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False)
error: Mapped[Optional[str]] = mapped_column(Text)
producer: Mapped[Optional[str]] = mapped_column(Text)
consumer_group: Mapped[Optional[str]] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(nullable=False)
started_at: Mapped[Optional[datetime]] = mapped_column(nullable=True)
finished_at: Mapped[Optional[datetime]] = mapped_column(nullable=True)
class DLJobEvent(Base):
"""
Модель журнала событий dl_job_events.
"""
__tablename__ = "dl_job_events"
event_id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
job_id: Mapped[str] = mapped_column(String(36), nullable=False)
queue: Mapped[str] = mapped_column(Text, nullable=False)
ts: Mapped[datetime] = mapped_column(nullable=False)
kind: Mapped[str] = mapped_column(Text, nullable=False)
payload: Mapped[Optional[dict[str, Any]]] = mapped_column(JSONB)
@dataclass(frozen=True)
class CreateJobRequest:
"""
Параметры постановки задачи.
"""
job_id: str
queue: str
task: str
args: dict[str, Any]
idempotency_key: Optional[str]
lock_key: str
partition_key: str
priority: int
available_at: datetime
max_attempts: int
lease_ttl_sec: int
producer: Optional[str]
consumer_group: Optional[str]
@dataclass(frozen=True)
class JobStatus:
"""
Снимок статуса задачи.
"""
job_id: str
status: str
attempt: int
started_at: Optional[datetime]
finished_at: Optional[datetime]
heartbeat_at: Optional[datetime]
error: Optional[str]
progress: dict[str, Any]
class QueueRepository:
"""
Репозиторий очереди и событий с полнотой ORM.
"""
def __init__(self, session: AsyncSession):
self.s = session
async def create_or_get(self, req: CreateJobRequest) -> tuple[str, str]:
"""
Идемпотентно создаёт запись в очереди и возвращает (job_id, status).
"""
if req.idempotency_key:
q = select(DLJob).where(DLJob.idempotency_key == req.idempotency_key)
r = await self.s.execute(q)
ex = r.scalar_one_or_none()
if ex:
return ex.job_id, ex.status
row = DLJob(
job_id=req.job_id,
queue=req.queue,
task=req.task,
args=req.args or {},
idempotency_key=req.idempotency_key,
lock_key=req.lock_key,
partition_key=req.partition_key or "",
priority=req.priority,
available_at=req.available_at,
status="queued",
attempt=0,
max_attempts=req.max_attempts,
lease_ttl_sec=req.lease_ttl_sec,
lease_expires_at=None,
heartbeat_at=None,
cancel_requested=False,
progress={},
error=None,
producer=req.producer,
consumer_group=req.consumer_group,
created_at=datetime.now(timezone.utc),
started_at=None,
finished_at=None,
)
self.s.add(row)
await self._append_event(req.job_id, req.queue, "queued", {"task": req.task})
await self.s.commit()
return req.job_id, "queued"
async def get_status(self, job_id: str) -> Optional[JobStatus]:
"""
Возвращает статус задачи.
"""
q = select(
DLJob.job_id,
DLJob.status,
DLJob.attempt,
DLJob.started_at,
DLJob.finished_at,
DLJob.heartbeat_at,
DLJob.error,
DLJob.progress,
).where(DLJob.job_id == job_id)
r = await self.s.execute(q)
m = r.first()
if not m:
return None
return JobStatus(
job_id=m.job_id,
status=m.status,
attempt=m.attempt,
started_at=m.started_at,
finished_at=m.finished_at,
heartbeat_at=m.heartbeat_at,
error=m.error,
progress=m.progress or {},
)
async def cancel(self, job_id: str) -> bool:
"""
Устанавливает флаг отмены для задачи.
"""
q = update(DLJob).where(DLJob.job_id == job_id).values(cancel_requested=True)
await self.s.execute(q)
await self._append_event(job_id, await self._resolve_queue(job_id), "cancel_requested", None)
await self.s.commit()
return True
async def claim_one(self, queue: str) -> Optional[dict[str, Any]]:
"""
Захватывает одну задачу из очереди с учётом блокировок и выставляет running.
"""
async with self.s.begin():
q = (
select(DLJob)
.where(
DLJob.status == "queued",
DLJob.queue == queue,
DLJob.available_at <= func.now(),
)
.order_by(DLJob.priority.asc(), DLJob.created_at.asc())
.with_for_update(skip_locked=True)
.limit(1)
)
r = await self.s.execute(q)
job: Optional[DLJob] = r.scalar_one_or_none()
if not job:
return None
job.status = "running"
job.started_at = job.started_at or datetime.now(timezone.utc)
job.attempt = int(job.attempt) + 1
job.heartbeat_at = datetime.now(timezone.utc)
job.lease_expires_at = datetime.now(timezone.utc) + timedelta(seconds=int(job.lease_ttl_sec))
ok = await self._try_advisory_lock(job.lock_key)
if not ok:
job.status = "queued"
job.available_at = datetime.now(timezone.utc) + timedelta(seconds=15)
return None
await self._append_event(job.job_id, job.queue, "picked", {"attempt": job.attempt})
return {
"job_id": job.job_id,
"queue": job.queue,
"task": job.task,
"args": job.args or {},
"lock_key": job.lock_key,
"partition_key": job.partition_key or "",
"lease_ttl_sec": int(job.lease_ttl_sec),
"attempt": int(job.attempt),
}
async def heartbeat(self, job_id: str, ttl_sec: int) -> None:
"""
Обновляет heartbeat и продлевает lease.
"""
now = datetime.now(timezone.utc)
q = (
update(DLJob)
.where(DLJob.job_id == job_id, DLJob.status == "running")
.values(heartbeat_at=now, lease_expires_at=now + timedelta(seconds=int(ttl_sec)))
)
await self.s.execute(q)
await self._append_event(job_id, await self._resolve_queue(job_id), "heartbeat", {"ttl": ttl_sec})
await self.s.commit()
async def finish_ok(self, job_id: str) -> None:
"""
Помечает задачу как выполненную успешно и снимает advisory-lock.
"""
job = await self._get(job_id)
if not job:
return
job.status = "succeeded"
job.finished_at = datetime.now(timezone.utc)
job.lease_expires_at = None
await self._append_event(job_id, job.queue, "succeeded", None)
await self._advisory_unlock(job.lock_key)
await self.s.commit()
async def finish_fail_or_retry(self, job_id: str, err: str) -> None:
"""
Помечает задачу как failed или возвращает в очередь с задержкой.
"""
job = await self._get(job_id)
if not job:
return
can_retry = int(job.attempt) < int(job.max_attempts)
if can_retry:
job.status = "queued"
job.available_at = datetime.now(timezone.utc) + timedelta(seconds=30 * int(job.attempt))
job.error = err
job.lease_expires_at = None
await self._append_event(job_id, job.queue, "requeue", {"attempt": job.attempt, "error": err})
else:
job.status = "failed"
job.error = err
job.finished_at = datetime.now(timezone.utc)
job.lease_expires_at = None
await self._append_event(job_id, job.queue, "failed", {"error": err})
await self._advisory_unlock(job.lock_key)
await self.s.commit()
async def requeue_lost(self, now: Optional[datetime] = None) -> list[str]:
"""
Возвращает протухшие running-задачи в очередь.
"""
now = now or datetime.now(timezone.utc)
q = (
select(DLJob)
.where(
DLJob.status == "running",
DLJob.lease_expires_at.is_not(None),
DLJob.lease_expires_at < now,
)
.with_for_update(skip_locked=True)
)
r = await self.s.execute(q)
rows = list(r.scalars().all())
ids: list[str] = []
for job in rows:
job.status = "queued"
job.available_at = now
job.lease_expires_at = None
ids.append(job.job_id)
await self._append_event(job.job_id, job.queue, "requeue_lost", None)
await self.s.commit()
return ids
async def _get(self, job_id: str) -> Optional[DLJob]:
"""
Возвращает ORM-объект задачи.
"""
r = await self.s.execute(select(DLJob).where(DLJob.job_id == job_id).with_for_update(skip_locked=True))
return r.scalar_one_or_none()
async def _resolve_queue(self, job_id: str) -> str:
"""
Возвращает имя очереди для события.
"""
r = await self.s.execute(select(DLJob.queue).where(DLJob.job_id == job_id))
v = r.scalar_one_or_none()
return v or ""
async def _append_event(self, job_id: str, queue: str, kind: str, payload: Optional[dict[str, Any]]) -> None:
"""
Добавляет запись в журнал событий.
"""
ev = DLJobEvent(
job_id=job_id,
queue=queue or "",
ts=datetime.now(timezone.utc),
kind=kind,
payload=payload or None,
)
self.s.add(ev)
async def _try_advisory_lock(self, lock_key: str) -> bool:
"""
Пытается получить advisory-lock в Postgres.
"""
r = await self.s.execute(select(func.pg_try_advisory_lock(func.hashtext(lock_key))))
return bool(r.scalar())
async def _advisory_unlock(self, lock_key: str) -> None:
"""
Снимает advisory-lock в Postgres.
"""
await self.s.execute(select(func.pg_advisory_unlock(func.hashtext(lock_key))))

View File

@ -0,0 +1,2 @@
"""Модуль воркеров для обработки задач."""

View File

@ -0,0 +1,71 @@
"""Общий PG-воркер: claim/lease/heartbeat/retry."""
import asyncio
from typing import Optional
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from ..config import Settings, PGSettings
from ..storage.db import Database
from .pipelines.registry import PipelineRegistry
class BaseWorker:
"""Базовый воркер для обработки задач из очереди."""
def __init__(
self,
queue: str,
settings: Settings,
shutdown_event: asyncio.Event,
):
self.queue = queue
self.settings = settings
self.shutdown_event = shutdown_event
self.db: Optional[Database] = None
self.current_job_id: Optional[UUID] = None
self.current_lock_key: Optional[str] = None
async def run(self):
"""Основной цикл воркера."""
# TODO: инициализация БД, подключение к LISTEN/NOTIFY
# TODO: цикл claim -> heartbeat -> execute -> finish
raise NotImplementedError
async def claim_job(self, session: AsyncSession) -> Optional[dict]:
"""
Claim одной задачи из очереди.
Возвращает dict с job_id, task, args, lock_key или None.
"""
# TODO: реализовать SELECT ... FOR UPDATE SKIP LOCKED
# TODO: проверка advisory lock на lock_key
# TODO: backoff если advisory lock занят
raise NotImplementedError
async def heartbeat(self, session: AsyncSession, job_id: UUID):
"""Обновление heartbeat для задачи."""
# TODO: UPDATE heartbeat_at и lease_expires_at
raise NotImplementedError
async def finish_job(
self,
session: AsyncSession,
job_id: UUID,
success: bool,
error: Optional[str] = None,
):
"""Завершение задачи (успех или ошибка с ретраем)."""
# TODO: UPDATE status в зависимости от success и attempt
# TODO: снятие advisory lock
raise NotImplementedError
async def check_cancel_requested(
self,
session: AsyncSession,
job_id: UUID,
) -> bool:
"""Проверка флага cancel_requested."""
# TODO: SELECT cancel_requested
raise NotImplementedError

View File

@ -0,0 +1,53 @@
"""Создание asyncio Tasks воркеров по конфигу."""
import asyncio
import json
from typing import List, Dict, Any
from ..config import Settings, PGSettings
from .base import BaseWorker
class WorkerManager:
"""Менеджер воркеров: создание и управление воркерами по конфигу."""
def __init__(self, settings: Settings):
self.settings = settings
self.workers: List[asyncio.Task] = []
self.worker_configs: List[Dict[str, Any]] = []
self._shutdown_event = asyncio.Event()
def _parse_workers_config(self) -> List[Dict[str, Any]]:
"""Парсинг WORKERS_JSON из конфига."""
if not self.settings.workers_json:
return []
try:
return json.loads(self.settings.workers_json)
except json.JSONDecodeError:
# TODO: логирование ошибки
return []
async def start(self):
"""Запуск всех воркеров."""
self.worker_configs = self._parse_workers_config()
for config in self.worker_configs:
queue = config.get("queue")
concurrency = config.get("concurrency", 1)
for _ in range(concurrency):
worker = BaseWorker(
queue=queue,
settings=self.settings,
shutdown_event=self._shutdown_event,
)
task = asyncio.create_task(worker.run())
self.workers.append(task)
async def shutdown(self, timeout: float = 30.0):
"""Мягкая остановка воркеров."""
self._shutdown_event.set()
# Ждем завершения всех воркеров с таймаутом
await asyncio.wait_for(
asyncio.gather(*self.workers, return_exceptions=True),
timeout=timeout,
)

View File

@ -0,0 +1,2 @@
"""Модуль пайплайнов обработки задач."""

View File

@ -0,0 +1,47 @@
"""Реестр обработчиков по task."""
from typing import Dict, Callable, Any, Awaitable, Optional
from abc import ABC, abstractmethod
class Pipeline(ABC):
"""Базовый класс для пайплайна обработки задачи."""
@abstractmethod
async def execute(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""
Выполнение задачи.
Возвращает результат (например, progress).
"""
pass
class PipelineRegistry:
"""Реестр обработчиков задач по типу task."""
def __init__(self):
self._pipelines: Dict[str, type[Pipeline]] = {}
def register(self, task: str, pipeline_class: type[Pipeline]):
"""Регистрация обработчика для типа задачи."""
self._pipelines[task] = pipeline_class
def get_pipeline(self, task: str) -> Optional[type[Pipeline]]:
"""Получить класс обработчика для типа задачи."""
return self._pipelines.get(task)
async def execute(
self,
task: str,
args: Dict[str, Any],
) -> Dict[str, Any]:
"""
Выполнить задачу через зарегистрированный обработчик.
"""
pipeline_class = self.get_pipeline(task)
if pipeline_class is None:
raise ValueError(f"Pipeline for task '{task}' not found")
pipeline = pipeline_class()
return await pipeline.execute(args)