From ad123437847f01f4c3916c3d64684fb3705d50a9 Mon Sep 17 00:00:00 2001 From: itqop Date: Wed, 5 Nov 2025 02:04:56 +0300 Subject: [PATCH] feat: add manager --- src/dataloader/api/__init__.py | 43 +++-- src/dataloader/workers/base.py | 165 ++++++++++++------- src/dataloader/workers/manager.py | 145 ++++++++++------ src/dataloader/workers/pipelines/noop.py | 20 +++ src/dataloader/workers/pipelines/registry.py | 68 +++----- 5 files changed, 273 insertions(+), 168 deletions(-) create mode 100644 src/dataloader/workers/pipelines/noop.py diff --git a/src/dataloader/api/__init__.py b/src/dataloader/api/__init__.py index b027fcd..490e546 100644 --- a/src/dataloader/api/__init__.py +++ b/src/dataloader/api/__init__.py @@ -1,3 +1,6 @@ +# src/dataloader/api/__init__.py +from __future__ import annotations + from collections.abc import AsyncGenerator import contextlib import typing as tp @@ -8,31 +11,35 @@ from .metric_router import router as metric_router from .middleware import log_requests from .os_router import router as service_router from .v1 import router as v1_router +from dataloader.context import APP_CTX +from dataloader.workers.manager import build_manager_from_env, WorkerManager + + +_manager: WorkerManager | None = None @contextlib.asynccontextmanager async def lifespan(app: tp.Any) -> AsyncGenerator[None, None]: - from dataloader.context import APP_CTX - + """ + Жизненный цикл приложения: инициализация контекста и запуск/остановка воркер-менеджера. + """ + global _manager await APP_CTX.on_startup() - yield - await APP_CTX.on_shutdown() + _manager = build_manager_from_env() + await _manager.start() + try: + yield + finally: + if _manager is not None: + await _manager.stop() + _manager = None + await APP_CTX.on_shutdown() app_main = FastAPI(title="Data Gateway", lifespan=lifespan) - app_main.middleware("http")(log_requests) +app_main.include_router(service_router, tags=["Openshift dataloader routes"]) +app_main.include_router(metric_router, tags=["Like/dislike metric dataloader routes"]) +app_main.include_router(v1_router, prefix="/api/v1", tags=["dataloader"]) -app_main.include_router( - service_router, tags=["Openshift dataloader routes"] -) -app_main.include_router( - metric_router, tags=["Like/dislike metric dataloader routes"] -) -app_main.include_router( - v1_router, prefix="/api/v1", tags=["dataloader"] -) - -__all__ = [ - "app_main", -] +__all__ = ["app_main"] diff --git a/src/dataloader/workers/base.py b/src/dataloader/workers/base.py index 55b4f4f..7c490a1 100644 --- a/src/dataloader/workers/base.py +++ b/src/dataloader/workers/base.py @@ -1,71 +1,110 @@ -"""Общий PG-воркер: claim/lease/heartbeat/retry.""" +# src/dataloader/workers/base.py +from __future__ import annotations import asyncio -from typing import Optional -from uuid import UUID -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import text +from contextlib import AsyncExitStack +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import AsyncIterator, Callable, Optional -from ..config import Settings, PGSettings -from ..storage.db import Database -from .pipelines.registry import PipelineRegistry +from dataloader.context import APP_CTX +from dataloader.storage.db import get_sessionmaker +from dataloader.storage.repositories import QueueRepository +from dataloader.workers.pipelines.registry import resolve as resolve_pipeline -class BaseWorker: - """Базовый воркер для обработки задач из очереди.""" - - def __init__( - self, - queue: str, - settings: Settings, - shutdown_event: asyncio.Event, - ): - self.queue = queue - self.settings = settings - self.shutdown_event = shutdown_event - self.db: Optional[Database] = None - self.current_job_id: Optional[UUID] = None - self.current_lock_key: Optional[str] = None - - async def run(self): - """Основной цикл воркера.""" - # TODO: инициализация БД, подключение к LISTEN/NOTIFY - # TODO: цикл claim -> heartbeat -> execute -> finish - raise NotImplementedError - - async def claim_job(self, session: AsyncSession) -> Optional[dict]: +@dataclass(frozen=True) +class WorkerConfig: + """ + Конфигурация воркера. + """ + queue: str + heartbeat_sec: int + claim_backoff_sec: int + + +class PGWorker: + """ + Базовый асинхронный воркер очереди Postgres. + """ + def __init__(self, cfg: WorkerConfig, stop_event: asyncio.Event) -> None: + self._cfg = cfg + self._stop = stop_event + self._log = APP_CTX.get_logger() + self._sm = get_sessionmaker() + + async def run(self) -> None: """ - Claim одной задачи из очереди. - Возвращает dict с job_id, task, args, lock_key или None. + Главный цикл: ожидание → claim → исполнение → завершение. """ - # TODO: реализовать SELECT ... FOR UPDATE SKIP LOCKED - # TODO: проверка advisory lock на lock_key - # TODO: backoff если advisory lock занят - raise NotImplementedError - - async def heartbeat(self, session: AsyncSession, job_id: UUID): - """Обновление heartbeat для задачи.""" - # TODO: UPDATE heartbeat_at и lease_expires_at - raise NotImplementedError - - async def finish_job( - self, - session: AsyncSession, - job_id: UUID, - success: bool, - error: Optional[str] = None, - ): - """Завершение задачи (успех или ошибка с ретраем).""" - # TODO: UPDATE status в зависимости от success и attempt - # TODO: снятие advisory lock - raise NotImplementedError - - async def check_cancel_requested( - self, - session: AsyncSession, - job_id: UUID, - ) -> bool: - """Проверка флага cancel_requested.""" - # TODO: SELECT cancel_requested - raise NotImplementedError + self._log.info(f"worker.start queue={self._cfg.queue}") + while not self._stop.is_set(): + claimed = await self._claim_and_execute_once() + if not claimed: + await self._listen_or_sleep(self._cfg.claim_backoff_sec) + self._log.info(f"worker.stop queue={self._cfg.queue}") + async def _listen_or_sleep(self, timeout_sec: int) -> None: + """ + Ожидание появления задач с тайм-аутом. + """ + try: + await asyncio.wait_for(self._stop.wait(), timeout=timeout_sec) + except asyncio.TimeoutError: + return + + async def _claim_and_execute_once(self) -> bool: + """ + Выполняет одну попытку захвата задачи и её обработку. + """ + async with AsyncExitStack() as stack: + s = await stack.enter_async_context(self._sm()) + repo = QueueRepository(s) + row = await repo.claim_one(self._cfg.queue) + if not row: + await s.commit() + return False + + job_id = row["job_id"] + ttl = int(row["lease_ttl_sec"]) + task = row["task"] + args = row["args"] + + try: + await self._execute_with_heartbeat(job_id, ttl, self._pipeline(task, args)) + await repo.finish_ok(job_id) + return True + except asyncio.CancelledError: + await repo.finish_fail_or_retry(job_id, "cancelled") + raise + except Exception as e: + await repo.finish_fail_or_retry(job_id, str(e)) + return True + + async def _execute_with_heartbeat(self, job_id: str, ttl: int, it: AsyncIterator[None]) -> None: + """ + Исполняет конвейер с поддержкой heartbeat. + """ + next_hb = datetime.now(timezone.utc) + timedelta(seconds=self._cfg.heartbeat_sec) + async for _ in it: + if datetime.now(timezone.utc) >= next_hb: + async with self._sm() as s_hb: + await QueueRepository(s_hb).heartbeat(job_id, ttl) + next_hb = datetime.now(timezone.utc) + timedelta(seconds=self._cfg.heartbeat_sec) + if self._stop.is_set(): + raise asyncio.CancelledError() + + async def _pipeline(self, task: str, args: dict) -> AsyncIterator[None]: + """ + Вызывает зарегистрированный пайплайн по имени задачи. + """ + fn: Callable[[dict], object] = resolve_pipeline(task) + res = fn(args) + if hasattr(res, "__aiter__"): + async for _ in res: # type: ignore[func-returns-value] + yield + elif asyncio.iscoroutine(res): + await res # type: ignore[arg-type] + yield + else: + yield diff --git a/src/dataloader/workers/manager.py b/src/dataloader/workers/manager.py index 898d30c..802a403 100644 --- a/src/dataloader/workers/manager.py +++ b/src/dataloader/workers/manager.py @@ -1,53 +1,106 @@ -"""Создание asyncio Tasks воркеров по конфигу.""" +# src/dataloader/workers/manager.py +from __future__ import annotations import asyncio -import json -from typing import List, Dict, Any -from ..config import Settings, PGSettings -from .base import BaseWorker +import contextlib +from dataclasses import dataclass +from typing import Any + +from dataloader.context import APP_CTX +from dataloader.config import APP_CONFIG +from dataloader.storage.db import get_sessionmaker +from dataloader.storage.repositories import QueueRepository +from dataloader.workers.base import PGWorker, WorkerConfig + + +@dataclass(frozen=True) +class WorkerSpec: + """ + Конфигурация набора воркеров для очереди. + """ + queue: str + concurrency: int class WorkerManager: - """Менеджер воркеров: создание и управление воркерами по конфигу.""" - - def __init__(self, settings: Settings): - self.settings = settings - self.workers: List[asyncio.Task] = [] - self.worker_configs: List[Dict[str, Any]] = [] - self._shutdown_event = asyncio.Event() - - def _parse_workers_config(self) -> List[Dict[str, Any]]: - """Парсинг WORKERS_JSON из конфига.""" - if not self.settings.workers_json: - return [] - try: - return json.loads(self.settings.workers_json) - except json.JSONDecodeError: - # TODO: логирование ошибки - return [] - - async def start(self): - """Запуск всех воркеров.""" - self.worker_configs = self._parse_workers_config() - for config in self.worker_configs: - queue = config.get("queue") - concurrency = config.get("concurrency", 1) - - for _ in range(concurrency): - worker = BaseWorker( - queue=queue, - settings=self.settings, - shutdown_event=self._shutdown_event, - ) - task = asyncio.create_task(worker.run()) - self.workers.append(task) - - async def shutdown(self, timeout: float = 30.0): - """Мягкая остановка воркеров.""" - self._shutdown_event.set() - # Ждем завершения всех воркеров с таймаутом - await asyncio.wait_for( - asyncio.gather(*self.workers, return_exceptions=True), - timeout=timeout, + """ + Управляет жизненным циклом асинхронных воркеров. + """ + def __init__(self, specs: list[WorkerSpec]) -> None: + self._log = APP_CTX.get_logger() + self._specs = specs + self._stop = asyncio.Event() + self._tasks: list[asyncio.Task] = [] + self._reaper_task: asyncio.Task | None = None + + async def start(self) -> None: + """ + Стартует воркеры и фоновую задачу реапера. + """ + hb = int(APP_CONFIG.dl.dl_heartbeat_sec) + backoff = int(APP_CONFIG.dl.dl_claim_backoff_sec) + + for spec in self._specs: + for i in range(max(1, spec.concurrency)): + cfg = WorkerConfig(queue=spec.queue, heartbeat_sec=hb, claim_backoff_sec=backoff) + t = asyncio.create_task(PGWorker(cfg, self._stop).run(), name=f"worker:{spec.queue}:{i}") + self._tasks.append(t) + + self._reaper_task = asyncio.create_task(self._reaper_loop(), name="reaper") + + self._log.info( + "worker_manager.started", + extra={"specs": [spec.__dict__ for spec in self._specs], "total_tasks": len(self._tasks)}, ) + async def stop(self) -> None: + """ + Останавливает воркеры и реапер. + """ + self._stop.set() + + for t in self._tasks: + t.cancel() + await asyncio.gather(*self._tasks, return_exceptions=True) + self._tasks.clear() + + if self._reaper_task: + self._reaper_task.cancel() + with contextlib.suppress(Exception): + await self._reaper_task + self._reaper_task = None + + self._log.info("worker_manager.stopped") + + async def _reaper_loop(self) -> None: + """ + Фоновый цикл возврата потерянных задач в очередь. + """ + period = int(APP_CONFIG.dl.dl_reaper_period_sec) + sm = get_sessionmaker() + while not self._stop.is_set(): + try: + async with sm() as s: + repo = QueueRepository(s) + ids = await repo.requeue_lost() + if ids: + APP_CTX.get_logger().info("reaper.requeued", extra={"count": len(ids)}) + except Exception as e: + APP_CTX.get_logger().error("reaper.error", extra={"error": str(e)}) + try: + await asyncio.wait_for(self._stop.wait(), timeout=period) + except asyncio.TimeoutError: + continue + + +def build_manager_from_env() -> WorkerManager: + """ + Собирает WorkerManager из WORKERS_JSON. + """ + specs: list[WorkerSpec] = [] + for item in APP_CONFIG.dl.parsed_workers(): + q = str(item.get("queue", "")).strip() + c = int(item.get("concurrency", 1)) + if q: + specs.append(WorkerSpec(queue=q, concurrency=max(1, c))) + return WorkerManager(specs) diff --git a/src/dataloader/workers/pipelines/noop.py b/src/dataloader/workers/pipelines/noop.py new file mode 100644 index 0000000..4b20c00 --- /dev/null +++ b/src/dataloader/workers/pipelines/noop.py @@ -0,0 +1,20 @@ +# src/dataloader/workers/pipelines/noop.py +from __future__ import annotations + +import asyncio +from typing import AsyncIterator + +from .registry import register + + +@register("noop") +async def noop(args: dict) -> AsyncIterator[None]: + """ + Эталонный пайплайн без побочных эффектов, имитирует 3 шага. + """ + await asyncio.sleep(float(args.get("sleep1", 2))) + yield + await asyncio.sleep(float(args.get("sleep2", 2))) + yield + await asyncio.sleep(float(args.get("sleep3", 2))) + yield diff --git a/src/dataloader/workers/pipelines/registry.py b/src/dataloader/workers/pipelines/registry.py index e20d28c..e7e0fc5 100644 --- a/src/dataloader/workers/pipelines/registry.py +++ b/src/dataloader/workers/pipelines/registry.py @@ -1,47 +1,33 @@ -"""Реестр обработчиков по task.""" +# src/dataloader/workers/pipelines/registry.py +from __future__ import annotations -from typing import Dict, Callable, Any, Awaitable, Optional -from abc import ABC, abstractmethod +from typing import Any, Callable, Dict, Iterable + +_Registry: Dict[str, Callable[[dict[str, Any]], Any]] = {} -class Pipeline(ABC): - """Базовый класс для пайплайна обработки задачи.""" - - @abstractmethod - async def execute(self, args: Dict[str, Any]) -> Dict[str, Any]: - """ - Выполнение задачи. - Возвращает результат (например, progress). - """ - pass +def register(task: str) -> Callable[[Callable[[dict[str, Any]], Any]], Callable[[dict[str, Any]], Any]]: + """ + Регистрирует обработчик пайплайна под именем задачи. + """ + def _wrap(fn: Callable[[dict[str, Any]], Any]) -> Callable[[dict[str, Any]], Any]: + _Registry[task] = fn + return fn + return _wrap -class PipelineRegistry: - """Реестр обработчиков задач по типу task.""" - - def __init__(self): - self._pipelines: Dict[str, type[Pipeline]] = {} - - def register(self, task: str, pipeline_class: type[Pipeline]): - """Регистрация обработчика для типа задачи.""" - self._pipelines[task] = pipeline_class - - def get_pipeline(self, task: str) -> Optional[type[Pipeline]]: - """Получить класс обработчика для типа задачи.""" - return self._pipelines.get(task) - - async def execute( - self, - task: str, - args: Dict[str, Any], - ) -> Dict[str, Any]: - """ - Выполнить задачу через зарегистрированный обработчик. - """ - pipeline_class = self.get_pipeline(task) - if pipeline_class is None: - raise ValueError(f"Pipeline for task '{task}' not found") - - pipeline = pipeline_class() - return await pipeline.execute(args) +def resolve(task: str) -> Callable[[dict[str, Any]], Any]: + """ + Возвращает обработчик пайплайна по имени задачи. + """ + try: + return _Registry[task] + except KeyError: + raise KeyError(f"pipeline not found: {task}") + +def tasks() -> Iterable[str]: + """ + Возвращает список зарегистрированных задач. + """ + return _Registry.keys()