From 7a76dc1d84abc1c4d7c380d71b55d5df958eb817 Mon Sep 17 00:00:00 2001 From: itqop Date: Wed, 5 Nov 2025 03:22:56 +0300 Subject: [PATCH] critical fixes --- src/dataloader/api/v1/router.py | 7 +- src/dataloader/api/v1/service.py | 14 +++- src/dataloader/storage/notify_listener.py | 84 +++++++++++++++++++++++ src/dataloader/storage/repositories.py | 45 ++++++++---- src/dataloader/workers/base.py | 81 +++++++++++++++++----- 5 files changed, 192 insertions(+), 39 deletions(-) create mode 100644 src/dataloader/storage/notify_listener.py diff --git a/src/dataloader/api/v1/router.py b/src/dataloader/api/v1/router.py index 4b48f10..3b89ee8 100644 --- a/src/dataloader/api/v1/router.py +++ b/src/dataloader/api/v1/router.py @@ -9,7 +9,6 @@ from uuid import UUID from fastapi import APIRouter, Depends, HTTPException from dataloader.api.v1.schemas import ( - CancelJobResponse, JobStatusResponse, TriggerJobRequest, TriggerJobResponse, @@ -18,7 +17,7 @@ from dataloader.api.v1.service import JobsService from dataloader.storage.db import session_scope -router = APIRouter(prefix="/api/v1/jobs", tags=["jobs"]) +router = APIRouter(prefix="/jobs", tags=["jobs"]) async def get_service() -> AsyncGenerator[JobsService, None]: @@ -54,11 +53,11 @@ async def get_status( return st -@router.post("/{job_id}/cancel", response_model=CancelJobResponse, status_code=HTTPStatus.OK) +@router.post("/{job_id}/cancel", response_model=JobStatusResponse, status_code=HTTPStatus.OK) async def cancel_job( job_id: UUID, svc: Annotated[JobsService, Depends(get_service)], -) -> CancelJobResponse: +) -> JobStatusResponse: """ Запрашивает отмену задачи. """ diff --git a/src/dataloader/api/v1/service.py b/src/dataloader/api/v1/service.py index dd75782..ca8f5fb 100644 --- a/src/dataloader/api/v1/service.py +++ b/src/dataloader/api/v1/service.py @@ -8,7 +8,6 @@ from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession from dataloader.api.v1.schemas import ( - CancelJobResponse, JobStatusResponse, TriggerJobRequest, TriggerJobResponse, @@ -72,7 +71,7 @@ class JobsService: progress=st.progress or {}, ) - async def cancel(self, job_id: UUID) -> Optional[CancelJobResponse]: + async def cancel(self, job_id: UUID) -> Optional[JobStatusResponse]: """ Запрашивает отмену задачи и возвращает её текущее состояние. """ @@ -80,4 +79,13 @@ class JobsService: st = await self._repo.get_status(str(job_id)) if not st: return None - return CancelJobResponse(job_id=UUID(st.job_id), status=st.status) + return JobStatusResponse( + job_id=UUID(st.job_id), + status=st.status, + attempt=st.attempt, + started_at=st.started_at, + finished_at=st.finished_at, + heartbeat_at=st.heartbeat_at, + error=st.error, + progress=st.progress or {}, + ) diff --git a/src/dataloader/storage/notify_listener.py b/src/dataloader/storage/notify_listener.py new file mode 100644 index 0000000..dcb6177 --- /dev/null +++ b/src/dataloader/storage/notify_listener.py @@ -0,0 +1,84 @@ +# src/dataloader/storage/notify_listener.py +from __future__ import annotations + +import asyncio +import asyncpg +from typing import Callable, Optional + +from dataloader.config import APP_CONFIG + + +class PGNotifyListener: + """ + Прослушиватель PostgreSQL NOTIFY для канала 'dl_jobs'. + """ + def __init__(self, queue: str, callback: Callable[[], None], stop_event: asyncio.Event): + self._queue = queue + self._callback = callback + self._stop = stop_event + self._conn: Optional[asyncpg.Connection] = None + self._task: Optional[asyncio.Task] = None + self._on_notify_handler: Optional[Callable] = None + + async def start(self) -> None: + """ + Запускает прослушивание уведомлений. + """ + dsn = APP_CONFIG.resolved_dsn + # Преобразуем SQLAlchemy DSN в asyncpg DSN + if dsn.startswith("postgresql+asyncpg://"): + dsn = dsn.replace("postgresql+asyncpg://", "postgresql://") + + self._conn = await asyncpg.connect(dsn) + + def on_notify(connection, pid, channel, payload): + # Проверяем, что это уведомление для нашей очереди + # payload содержит имя очереди из триггера + # Callback вызывается из потока asyncpg, поэтому используем asyncio.ensure_future + if channel == "dl_jobs" and payload == self._queue: + try: + # Event.set() потокобезопасен + self._callback() + except Exception: + # Игнорируем ошибки в callback, чтобы не сломать listener + pass + + self._on_notify_handler = on_notify + # Сначала выполняем LISTEN, затем добавляем listener + await self._conn.execute("LISTEN dl_jobs") + await self._conn.add_listener("dl_jobs", self._on_notify_handler) + + # Запускаем задачу для мониторинга соединения + self._task = asyncio.create_task(self._monitor_connection()) + + async def _monitor_connection(self) -> None: + """ + Мониторит соединение и останавливает при stop_event. + """ + try: + await self._stop.wait() + finally: + await self.stop() + + async def stop(self) -> None: + """ + Останавливает прослушивание и закрывает соединение. + """ + if self._task and not self._task.done(): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + if self._conn and self._on_notify_handler: + try: + await self._conn.remove_listener("dl_jobs", self._on_notify_handler) + except Exception: + pass + try: + await self._conn.close() + except Exception: + pass + self._conn = None + diff --git a/src/dataloader/storage/repositories.py b/src/dataloader/storage/repositories.py index 01e22ca..c8e5e80 100644 --- a/src/dataloader/storage/repositories.py +++ b/src/dataloader/storage/repositories.py @@ -195,7 +195,7 @@ class QueueRepository: await self.s.commit() return True - async def claim_one(self, queue: str) -> Optional[dict[str, Any]]: + async def claim_one(self, queue: str, claim_backoff_sec: int) -> Optional[dict[str, Any]]: """ Захватывает одну задачу из очереди с учётом блокировок и выставляет running. """ @@ -225,7 +225,7 @@ class QueueRepository: ok = await self._try_advisory_lock(job.lock_key) if not ok: job.status = "queued" - job.available_at = datetime.now(timezone.utc) + timedelta(seconds=15) + job.available_at = datetime.now(timezone.utc) + timedelta(seconds=claim_backoff_sec) return None await self._append_event(job.job_id, job.queue, "picked", {"attempt": job.attempt}) @@ -241,10 +241,16 @@ class QueueRepository: "attempt": int(job.attempt), } - async def heartbeat(self, job_id: str, ttl_sec: int) -> None: + async def heartbeat(self, job_id: str, ttl_sec: int) -> tuple[bool, bool]: """ Обновляет heartbeat и продлевает lease. + Возвращает (success, cancel_requested). """ + job = await self._get(job_id) + if not job or job.status != "running": + return False, False + + cancel_requested = bool(job.cancel_requested) now = datetime.now(timezone.utc) q = ( update(DLJob) @@ -254,6 +260,7 @@ class QueueRepository: await self.s.execute(q) await self._append_event(job_id, await self._resolve_queue(job_id), "heartbeat", {"ttl": ttl_sec}) await self.s.commit() + return True, cancel_requested async def finish_ok(self, job_id: str) -> None: """ @@ -269,26 +276,34 @@ class QueueRepository: await self._advisory_unlock(job.lock_key) await self.s.commit() - async def finish_fail_or_retry(self, job_id: str, err: str) -> None: + async def finish_fail_or_retry(self, job_id: str, err: str, is_canceled: bool = False) -> None: """ - Помечает задачу как failed или возвращает в очередь с задержкой. + Помечает задачу как failed, canceled или возвращает в очередь с задержкой. """ job = await self._get(job_id) if not job: return - can_retry = int(job.attempt) < int(job.max_attempts) - if can_retry: - job.status = "queued" - job.available_at = datetime.now(timezone.utc) + timedelta(seconds=30 * int(job.attempt)) - job.error = err - job.lease_expires_at = None - await self._append_event(job_id, job.queue, "requeue", {"attempt": job.attempt, "error": err}) - else: - job.status = "failed" + + if is_canceled: + job.status = "canceled" job.error = err job.finished_at = datetime.now(timezone.utc) job.lease_expires_at = None - await self._append_event(job_id, job.queue, "failed", {"error": err}) + await self._append_event(job_id, job.queue, "canceled", {"error": err}) + else: + can_retry = int(job.attempt) < int(job.max_attempts) + if can_retry: + job.status = "queued" + job.available_at = datetime.now(timezone.utc) + timedelta(seconds=30 * int(job.attempt)) + job.error = err + job.lease_expires_at = None + await self._append_event(job_id, job.queue, "requeue", {"attempt": job.attempt, "error": err}) + else: + job.status = "failed" + job.error = err + job.finished_at = datetime.now(timezone.utc) + job.lease_expires_at = None + await self._append_event(job_id, job.queue, "failed", {"error": err}) await self._advisory_unlock(job.lock_key) await self.s.commit() diff --git a/src/dataloader/workers/base.py b/src/dataloader/workers/base.py index 7c490a1..0aec038 100644 --- a/src/dataloader/workers/base.py +++ b/src/dataloader/workers/base.py @@ -10,6 +10,7 @@ from typing import AsyncIterator, Callable, Optional from dataloader.context import APP_CTX from dataloader.storage.db import get_sessionmaker from dataloader.storage.repositories import QueueRepository +from dataloader.storage.notify_listener import PGNotifyListener from dataloader.workers.pipelines.registry import resolve as resolve_pipeline @@ -32,26 +33,64 @@ class PGWorker: self._stop = stop_event self._log = APP_CTX.get_logger() self._sm = get_sessionmaker() + self._notify_wakeup = asyncio.Event() + self._listener: Optional[PGNotifyListener] = None async def run(self) -> None: """ Главный цикл: ожидание → claim → исполнение → завершение. """ self._log.info(f"worker.start queue={self._cfg.queue}") - while not self._stop.is_set(): - claimed = await self._claim_and_execute_once() - if not claimed: - await self._listen_or_sleep(self._cfg.claim_backoff_sec) + + # Запускаем LISTEN/NOTIFY + self._listener = PGNotifyListener( + self._cfg.queue, + lambda: self._notify_wakeup.set(), + self._stop + ) + try: + await self._listener.start() + except Exception as e: + self._log.warning(f"Failed to start LISTEN/NOTIFY, falling back to polling: {e}") + self._listener = None + + try: + while not self._stop.is_set(): + claimed = await self._claim_and_execute_once() + if not claimed: + await self._listen_or_sleep(self._cfg.claim_backoff_sec) + finally: + if self._listener: + await self._listener.stop() self._log.info(f"worker.stop queue={self._cfg.queue}") async def _listen_or_sleep(self, timeout_sec: int) -> None: """ - Ожидание появления задач с тайм-аутом. + Ожидание появления задач через LISTEN/NOTIFY или с тайм-аутом. """ - try: - await asyncio.wait_for(self._stop.wait(), timeout=timeout_sec) - except asyncio.TimeoutError: - return + if self._listener: + # Используем LISTEN/NOTIFY с fallback на таймаут + done, pending = await asyncio.wait( + [asyncio.create_task(self._notify_wakeup.wait()), asyncio.create_task(self._stop.wait())], + return_when=asyncio.FIRST_COMPLETED, + timeout=timeout_sec + ) + # Отменяем оставшиеся задачи + for task in pending: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + # Очищаем событие, если оно было установлено + if self._notify_wakeup.is_set(): + self._notify_wakeup.clear() + else: + # Fallback на простой таймаут + try: + await asyncio.wait_for(self._stop.wait(), timeout=timeout_sec) + except asyncio.TimeoutError: + return async def _claim_and_execute_once(self) -> bool: """ @@ -60,7 +99,7 @@ class PGWorker: async with AsyncExitStack() as stack: s = await stack.enter_async_context(self._sm()) repo = QueueRepository(s) - row = await repo.claim_one(self._cfg.queue) + row = await repo.claim_one(self._cfg.queue, self._cfg.claim_backoff_sec) if not row: await s.commit() return False @@ -71,28 +110,36 @@ class PGWorker: args = row["args"] try: - await self._execute_with_heartbeat(job_id, ttl, self._pipeline(task, args)) - await repo.finish_ok(job_id) + canceled = await self._execute_with_heartbeat(job_id, ttl, self._pipeline(task, args)) + if canceled: + await repo.finish_fail_or_retry(job_id, "canceled by user", is_canceled=True) + else: + await repo.finish_ok(job_id) return True except asyncio.CancelledError: - await repo.finish_fail_or_retry(job_id, "cancelled") + await repo.finish_fail_or_retry(job_id, "cancelled by shutdown", is_canceled=True) raise except Exception as e: await repo.finish_fail_or_retry(job_id, str(e)) return True - async def _execute_with_heartbeat(self, job_id: str, ttl: int, it: AsyncIterator[None]) -> None: + async def _execute_with_heartbeat(self, job_id: str, ttl: int, it: AsyncIterator[None]) -> bool: """ Исполняет конвейер с поддержкой heartbeat. + Возвращает True, если задача была отменена (cancel_requested). """ next_hb = datetime.now(timezone.utc) + timedelta(seconds=self._cfg.heartbeat_sec) async for _ in it: - if datetime.now(timezone.utc) >= next_hb: + now = datetime.now(timezone.utc) + if now >= next_hb: async with self._sm() as s_hb: - await QueueRepository(s_hb).heartbeat(job_id, ttl) - next_hb = datetime.now(timezone.utc) + timedelta(seconds=self._cfg.heartbeat_sec) + success, cancel_requested = await QueueRepository(s_hb).heartbeat(job_id, ttl) + if cancel_requested: + return True + next_hb = now + timedelta(seconds=self._cfg.heartbeat_sec) if self._stop.is_set(): raise asyncio.CancelledError() + return False async def _pipeline(self, task: str, args: dict) -> AsyncIterator[None]: """