feat: add manager

This commit is contained in:
itqop 2025-11-05 02:04:56 +03:00
parent 18cbbe00d3
commit ad12343784
5 changed files with 273 additions and 168 deletions

View File

@ -1,3 +1,6 @@
# src/dataloader/api/__init__.py
from __future__ import annotations
from collections.abc import AsyncGenerator
import contextlib
import typing as tp
@ -8,31 +11,35 @@ from .metric_router import router as metric_router
from .middleware import log_requests
from .os_router import router as service_router
from .v1 import router as v1_router
from dataloader.context import APP_CTX
from dataloader.workers.manager import build_manager_from_env, WorkerManager
_manager: WorkerManager | None = None
@contextlib.asynccontextmanager
async def lifespan(app: tp.Any) -> AsyncGenerator[None, None]:
from dataloader.context import APP_CTX
"""
Жизненный цикл приложения: инициализация контекста и запуск/остановка воркер-менеджера.
"""
global _manager
await APP_CTX.on_startup()
yield
await APP_CTX.on_shutdown()
_manager = build_manager_from_env()
await _manager.start()
try:
yield
finally:
if _manager is not None:
await _manager.stop()
_manager = None
await APP_CTX.on_shutdown()
app_main = FastAPI(title="Data Gateway", lifespan=lifespan)
app_main.middleware("http")(log_requests)
app_main.include_router(service_router, tags=["Openshift dataloader routes"])
app_main.include_router(metric_router, tags=["Like/dislike metric dataloader routes"])
app_main.include_router(v1_router, prefix="/api/v1", tags=["dataloader"])
app_main.include_router(
service_router, tags=["Openshift dataloader routes"]
)
app_main.include_router(
metric_router, tags=["Like/dislike metric dataloader routes"]
)
app_main.include_router(
v1_router, prefix="/api/v1", tags=["dataloader"]
)
__all__ = [
"app_main",
]
__all__ = ["app_main"]

View File

@ -1,71 +1,110 @@
"""Общий PG-воркер: claim/lease/heartbeat/retry."""
# src/dataloader/workers/base.py
from __future__ import annotations
import asyncio
from typing import Optional
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from contextlib import AsyncExitStack
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import AsyncIterator, Callable, Optional
from ..config import Settings, PGSettings
from ..storage.db import Database
from .pipelines.registry import PipelineRegistry
from dataloader.context import APP_CTX
from dataloader.storage.db import get_sessionmaker
from dataloader.storage.repositories import QueueRepository
from dataloader.workers.pipelines.registry import resolve as resolve_pipeline
class BaseWorker:
"""Базовый воркер для обработки задач из очереди."""
def __init__(
self,
queue: str,
settings: Settings,
shutdown_event: asyncio.Event,
):
self.queue = queue
self.settings = settings
self.shutdown_event = shutdown_event
self.db: Optional[Database] = None
self.current_job_id: Optional[UUID] = None
self.current_lock_key: Optional[str] = None
async def run(self):
"""Основной цикл воркера."""
# TODO: инициализация БД, подключение к LISTEN/NOTIFY
# TODO: цикл claim -> heartbeat -> execute -> finish
raise NotImplementedError
async def claim_job(self, session: AsyncSession) -> Optional[dict]:
@dataclass(frozen=True)
class WorkerConfig:
"""
Конфигурация воркера.
"""
queue: str
heartbeat_sec: int
claim_backoff_sec: int
class PGWorker:
"""
Базовый асинхронный воркер очереди Postgres.
"""
def __init__(self, cfg: WorkerConfig, stop_event: asyncio.Event) -> None:
self._cfg = cfg
self._stop = stop_event
self._log = APP_CTX.get_logger()
self._sm = get_sessionmaker()
async def run(self) -> None:
"""
Claim одной задачи из очереди.
Возвращает dict с job_id, task, args, lock_key или None.
Главный цикл: ожидание claim исполнение завершение.
"""
# TODO: реализовать SELECT ... FOR UPDATE SKIP LOCKED
# TODO: проверка advisory lock на lock_key
# TODO: backoff если advisory lock занят
raise NotImplementedError
async def heartbeat(self, session: AsyncSession, job_id: UUID):
"""Обновление heartbeat для задачи."""
# TODO: UPDATE heartbeat_at и lease_expires_at
raise NotImplementedError
async def finish_job(
self,
session: AsyncSession,
job_id: UUID,
success: bool,
error: Optional[str] = None,
):
"""Завершение задачи (успех или ошибка с ретраем)."""
# TODO: UPDATE status в зависимости от success и attempt
# TODO: снятие advisory lock
raise NotImplementedError
async def check_cancel_requested(
self,
session: AsyncSession,
job_id: UUID,
) -> bool:
"""Проверка флага cancel_requested."""
# TODO: SELECT cancel_requested
raise NotImplementedError
self._log.info(f"worker.start queue={self._cfg.queue}")
while not self._stop.is_set():
claimed = await self._claim_and_execute_once()
if not claimed:
await self._listen_or_sleep(self._cfg.claim_backoff_sec)
self._log.info(f"worker.stop queue={self._cfg.queue}")
async def _listen_or_sleep(self, timeout_sec: int) -> None:
"""
Ожидание появления задач с тайм-аутом.
"""
try:
await asyncio.wait_for(self._stop.wait(), timeout=timeout_sec)
except asyncio.TimeoutError:
return
async def _claim_and_execute_once(self) -> bool:
"""
Выполняет одну попытку захвата задачи и её обработку.
"""
async with AsyncExitStack() as stack:
s = await stack.enter_async_context(self._sm())
repo = QueueRepository(s)
row = await repo.claim_one(self._cfg.queue)
if not row:
await s.commit()
return False
job_id = row["job_id"]
ttl = int(row["lease_ttl_sec"])
task = row["task"]
args = row["args"]
try:
await self._execute_with_heartbeat(job_id, ttl, self._pipeline(task, args))
await repo.finish_ok(job_id)
return True
except asyncio.CancelledError:
await repo.finish_fail_or_retry(job_id, "cancelled")
raise
except Exception as e:
await repo.finish_fail_or_retry(job_id, str(e))
return True
async def _execute_with_heartbeat(self, job_id: str, ttl: int, it: AsyncIterator[None]) -> None:
"""
Исполняет конвейер с поддержкой heartbeat.
"""
next_hb = datetime.now(timezone.utc) + timedelta(seconds=self._cfg.heartbeat_sec)
async for _ in it:
if datetime.now(timezone.utc) >= next_hb:
async with self._sm() as s_hb:
await QueueRepository(s_hb).heartbeat(job_id, ttl)
next_hb = datetime.now(timezone.utc) + timedelta(seconds=self._cfg.heartbeat_sec)
if self._stop.is_set():
raise asyncio.CancelledError()
async def _pipeline(self, task: str, args: dict) -> AsyncIterator[None]:
"""
Вызывает зарегистрированный пайплайн по имени задачи.
"""
fn: Callable[[dict], object] = resolve_pipeline(task)
res = fn(args)
if hasattr(res, "__aiter__"):
async for _ in res: # type: ignore[func-returns-value]
yield
elif asyncio.iscoroutine(res):
await res # type: ignore[arg-type]
yield
else:
yield

View File

@ -1,53 +1,106 @@
"""Создание asyncio Tasks воркеров по конфигу."""
# src/dataloader/workers/manager.py
from __future__ import annotations
import asyncio
import json
from typing import List, Dict, Any
from ..config import Settings, PGSettings
from .base import BaseWorker
import contextlib
from dataclasses import dataclass
from typing import Any
from dataloader.context import APP_CTX
from dataloader.config import APP_CONFIG
from dataloader.storage.db import get_sessionmaker
from dataloader.storage.repositories import QueueRepository
from dataloader.workers.base import PGWorker, WorkerConfig
@dataclass(frozen=True)
class WorkerSpec:
"""
Конфигурация набора воркеров для очереди.
"""
queue: str
concurrency: int
class WorkerManager:
"""Менеджер воркеров: создание и управление воркерами по конфигу."""
def __init__(self, settings: Settings):
self.settings = settings
self.workers: List[asyncio.Task] = []
self.worker_configs: List[Dict[str, Any]] = []
self._shutdown_event = asyncio.Event()
def _parse_workers_config(self) -> List[Dict[str, Any]]:
"""Парсинг WORKERS_JSON из конфига."""
if not self.settings.workers_json:
return []
try:
return json.loads(self.settings.workers_json)
except json.JSONDecodeError:
# TODO: логирование ошибки
return []
async def start(self):
"""Запуск всех воркеров."""
self.worker_configs = self._parse_workers_config()
for config in self.worker_configs:
queue = config.get("queue")
concurrency = config.get("concurrency", 1)
for _ in range(concurrency):
worker = BaseWorker(
queue=queue,
settings=self.settings,
shutdown_event=self._shutdown_event,
)
task = asyncio.create_task(worker.run())
self.workers.append(task)
async def shutdown(self, timeout: float = 30.0):
"""Мягкая остановка воркеров."""
self._shutdown_event.set()
# Ждем завершения всех воркеров с таймаутом
await asyncio.wait_for(
asyncio.gather(*self.workers, return_exceptions=True),
timeout=timeout,
"""
Управляет жизненным циклом асинхронных воркеров.
"""
def __init__(self, specs: list[WorkerSpec]) -> None:
self._log = APP_CTX.get_logger()
self._specs = specs
self._stop = asyncio.Event()
self._tasks: list[asyncio.Task] = []
self._reaper_task: asyncio.Task | None = None
async def start(self) -> None:
"""
Стартует воркеры и фоновую задачу реапера.
"""
hb = int(APP_CONFIG.dl.dl_heartbeat_sec)
backoff = int(APP_CONFIG.dl.dl_claim_backoff_sec)
for spec in self._specs:
for i in range(max(1, spec.concurrency)):
cfg = WorkerConfig(queue=spec.queue, heartbeat_sec=hb, claim_backoff_sec=backoff)
t = asyncio.create_task(PGWorker(cfg, self._stop).run(), name=f"worker:{spec.queue}:{i}")
self._tasks.append(t)
self._reaper_task = asyncio.create_task(self._reaper_loop(), name="reaper")
self._log.info(
"worker_manager.started",
extra={"specs": [spec.__dict__ for spec in self._specs], "total_tasks": len(self._tasks)},
)
async def stop(self) -> None:
"""
Останавливает воркеры и реапер.
"""
self._stop.set()
for t in self._tasks:
t.cancel()
await asyncio.gather(*self._tasks, return_exceptions=True)
self._tasks.clear()
if self._reaper_task:
self._reaper_task.cancel()
with contextlib.suppress(Exception):
await self._reaper_task
self._reaper_task = None
self._log.info("worker_manager.stopped")
async def _reaper_loop(self) -> None:
"""
Фоновый цикл возврата потерянных задач в очередь.
"""
period = int(APP_CONFIG.dl.dl_reaper_period_sec)
sm = get_sessionmaker()
while not self._stop.is_set():
try:
async with sm() as s:
repo = QueueRepository(s)
ids = await repo.requeue_lost()
if ids:
APP_CTX.get_logger().info("reaper.requeued", extra={"count": len(ids)})
except Exception as e:
APP_CTX.get_logger().error("reaper.error", extra={"error": str(e)})
try:
await asyncio.wait_for(self._stop.wait(), timeout=period)
except asyncio.TimeoutError:
continue
def build_manager_from_env() -> WorkerManager:
"""
Собирает WorkerManager из WORKERS_JSON.
"""
specs: list[WorkerSpec] = []
for item in APP_CONFIG.dl.parsed_workers():
q = str(item.get("queue", "")).strip()
c = int(item.get("concurrency", 1))
if q:
specs.append(WorkerSpec(queue=q, concurrency=max(1, c)))
return WorkerManager(specs)

View File

@ -0,0 +1,20 @@
# src/dataloader/workers/pipelines/noop.py
from __future__ import annotations
import asyncio
from typing import AsyncIterator
from .registry import register
@register("noop")
async def noop(args: dict) -> AsyncIterator[None]:
"""
Эталонный пайплайн без побочных эффектов, имитирует 3 шага.
"""
await asyncio.sleep(float(args.get("sleep1", 2)))
yield
await asyncio.sleep(float(args.get("sleep2", 2)))
yield
await asyncio.sleep(float(args.get("sleep3", 2)))
yield

View File

@ -1,47 +1,33 @@
"""Реестр обработчиков по task."""
# src/dataloader/workers/pipelines/registry.py
from __future__ import annotations
from typing import Dict, Callable, Any, Awaitable, Optional
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable
_Registry: Dict[str, Callable[[dict[str, Any]], Any]] = {}
class Pipeline(ABC):
"""Базовый класс для пайплайна обработки задачи."""
@abstractmethod
async def execute(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""
Выполнение задачи.
Возвращает результат (например, progress).
"""
pass
def register(task: str) -> Callable[[Callable[[dict[str, Any]], Any]], Callable[[dict[str, Any]], Any]]:
"""
Регистрирует обработчик пайплайна под именем задачи.
"""
def _wrap(fn: Callable[[dict[str, Any]], Any]) -> Callable[[dict[str, Any]], Any]:
_Registry[task] = fn
return fn
return _wrap
class PipelineRegistry:
"""Реестр обработчиков задач по типу task."""
def __init__(self):
self._pipelines: Dict[str, type[Pipeline]] = {}
def register(self, task: str, pipeline_class: type[Pipeline]):
"""Регистрация обработчика для типа задачи."""
self._pipelines[task] = pipeline_class
def get_pipeline(self, task: str) -> Optional[type[Pipeline]]:
"""Получить класс обработчика для типа задачи."""
return self._pipelines.get(task)
async def execute(
self,
task: str,
args: Dict[str, Any],
) -> Dict[str, Any]:
"""
Выполнить задачу через зарегистрированный обработчик.
"""
pipeline_class = self.get_pipeline(task)
if pipeline_class is None:
raise ValueError(f"Pipeline for task '{task}' not found")
pipeline = pipeline_class()
return await pipeline.execute(args)
def resolve(task: str) -> Callable[[dict[str, Any]], Any]:
"""
Возвращает обработчик пайплайна по имени задачи.
"""
try:
return _Registry[task]
except KeyError:
raise KeyError(f"pipeline not found: {task}")
def tasks() -> Iterable[str]:
"""
Возвращает список зарегистрированных задач.
"""
return _Registry.keys()