critical fixes
This commit is contained in:
parent
6d52bcbbe7
commit
7a76dc1d84
|
|
@ -9,7 +9,6 @@ from uuid import UUID
|
||||||
from fastapi import APIRouter, Depends, HTTPException
|
from fastapi import APIRouter, Depends, HTTPException
|
||||||
|
|
||||||
from dataloader.api.v1.schemas import (
|
from dataloader.api.v1.schemas import (
|
||||||
CancelJobResponse,
|
|
||||||
JobStatusResponse,
|
JobStatusResponse,
|
||||||
TriggerJobRequest,
|
TriggerJobRequest,
|
||||||
TriggerJobResponse,
|
TriggerJobResponse,
|
||||||
|
|
@ -18,7 +17,7 @@ from dataloader.api.v1.service import JobsService
|
||||||
from dataloader.storage.db import session_scope
|
from dataloader.storage.db import session_scope
|
||||||
|
|
||||||
|
|
||||||
router = APIRouter(prefix="/api/v1/jobs", tags=["jobs"])
|
router = APIRouter(prefix="/jobs", tags=["jobs"])
|
||||||
|
|
||||||
|
|
||||||
async def get_service() -> AsyncGenerator[JobsService, None]:
|
async def get_service() -> AsyncGenerator[JobsService, None]:
|
||||||
|
|
@ -54,11 +53,11 @@ async def get_status(
|
||||||
return st
|
return st
|
||||||
|
|
||||||
|
|
||||||
@router.post("/{job_id}/cancel", response_model=CancelJobResponse, status_code=HTTPStatus.OK)
|
@router.post("/{job_id}/cancel", response_model=JobStatusResponse, status_code=HTTPStatus.OK)
|
||||||
async def cancel_job(
|
async def cancel_job(
|
||||||
job_id: UUID,
|
job_id: UUID,
|
||||||
svc: Annotated[JobsService, Depends(get_service)],
|
svc: Annotated[JobsService, Depends(get_service)],
|
||||||
) -> CancelJobResponse:
|
) -> JobStatusResponse:
|
||||||
"""
|
"""
|
||||||
Запрашивает отмену задачи.
|
Запрашивает отмену задачи.
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,6 @@ from uuid import UUID
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from dataloader.api.v1.schemas import (
|
from dataloader.api.v1.schemas import (
|
||||||
CancelJobResponse,
|
|
||||||
JobStatusResponse,
|
JobStatusResponse,
|
||||||
TriggerJobRequest,
|
TriggerJobRequest,
|
||||||
TriggerJobResponse,
|
TriggerJobResponse,
|
||||||
|
|
@ -72,7 +71,7 @@ class JobsService:
|
||||||
progress=st.progress or {},
|
progress=st.progress or {},
|
||||||
)
|
)
|
||||||
|
|
||||||
async def cancel(self, job_id: UUID) -> Optional[CancelJobResponse]:
|
async def cancel(self, job_id: UUID) -> Optional[JobStatusResponse]:
|
||||||
"""
|
"""
|
||||||
Запрашивает отмену задачи и возвращает её текущее состояние.
|
Запрашивает отмену задачи и возвращает её текущее состояние.
|
||||||
"""
|
"""
|
||||||
|
|
@ -80,4 +79,13 @@ class JobsService:
|
||||||
st = await self._repo.get_status(str(job_id))
|
st = await self._repo.get_status(str(job_id))
|
||||||
if not st:
|
if not st:
|
||||||
return None
|
return None
|
||||||
return CancelJobResponse(job_id=UUID(st.job_id), status=st.status)
|
return JobStatusResponse(
|
||||||
|
job_id=UUID(st.job_id),
|
||||||
|
status=st.status,
|
||||||
|
attempt=st.attempt,
|
||||||
|
started_at=st.started_at,
|
||||||
|
finished_at=st.finished_at,
|
||||||
|
heartbeat_at=st.heartbeat_at,
|
||||||
|
error=st.error,
|
||||||
|
progress=st.progress or {},
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,84 @@
|
||||||
|
# src/dataloader/storage/notify_listener.py
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import asyncpg
|
||||||
|
from typing import Callable, Optional
|
||||||
|
|
||||||
|
from dataloader.config import APP_CONFIG
|
||||||
|
|
||||||
|
|
||||||
|
class PGNotifyListener:
|
||||||
|
"""
|
||||||
|
Прослушиватель PostgreSQL NOTIFY для канала 'dl_jobs'.
|
||||||
|
"""
|
||||||
|
def __init__(self, queue: str, callback: Callable[[], None], stop_event: asyncio.Event):
|
||||||
|
self._queue = queue
|
||||||
|
self._callback = callback
|
||||||
|
self._stop = stop_event
|
||||||
|
self._conn: Optional[asyncpg.Connection] = None
|
||||||
|
self._task: Optional[asyncio.Task] = None
|
||||||
|
self._on_notify_handler: Optional[Callable] = None
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""
|
||||||
|
Запускает прослушивание уведомлений.
|
||||||
|
"""
|
||||||
|
dsn = APP_CONFIG.resolved_dsn
|
||||||
|
# Преобразуем SQLAlchemy DSN в asyncpg DSN
|
||||||
|
if dsn.startswith("postgresql+asyncpg://"):
|
||||||
|
dsn = dsn.replace("postgresql+asyncpg://", "postgresql://")
|
||||||
|
|
||||||
|
self._conn = await asyncpg.connect(dsn)
|
||||||
|
|
||||||
|
def on_notify(connection, pid, channel, payload):
|
||||||
|
# Проверяем, что это уведомление для нашей очереди
|
||||||
|
# payload содержит имя очереди из триггера
|
||||||
|
# Callback вызывается из потока asyncpg, поэтому используем asyncio.ensure_future
|
||||||
|
if channel == "dl_jobs" and payload == self._queue:
|
||||||
|
try:
|
||||||
|
# Event.set() потокобезопасен
|
||||||
|
self._callback()
|
||||||
|
except Exception:
|
||||||
|
# Игнорируем ошибки в callback, чтобы не сломать listener
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._on_notify_handler = on_notify
|
||||||
|
# Сначала выполняем LISTEN, затем добавляем listener
|
||||||
|
await self._conn.execute("LISTEN dl_jobs")
|
||||||
|
await self._conn.add_listener("dl_jobs", self._on_notify_handler)
|
||||||
|
|
||||||
|
# Запускаем задачу для мониторинга соединения
|
||||||
|
self._task = asyncio.create_task(self._monitor_connection())
|
||||||
|
|
||||||
|
async def _monitor_connection(self) -> None:
|
||||||
|
"""
|
||||||
|
Мониторит соединение и останавливает при stop_event.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
await self._stop.wait()
|
||||||
|
finally:
|
||||||
|
await self.stop()
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""
|
||||||
|
Останавливает прослушивание и закрывает соединение.
|
||||||
|
"""
|
||||||
|
if self._task and not self._task.done():
|
||||||
|
self._task.cancel()
|
||||||
|
try:
|
||||||
|
await self._task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if self._conn and self._on_notify_handler:
|
||||||
|
try:
|
||||||
|
await self._conn.remove_listener("dl_jobs", self._on_notify_handler)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
await self._conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._conn = None
|
||||||
|
|
||||||
|
|
@ -195,7 +195,7 @@ class QueueRepository:
|
||||||
await self.s.commit()
|
await self.s.commit()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def claim_one(self, queue: str) -> Optional[dict[str, Any]]:
|
async def claim_one(self, queue: str, claim_backoff_sec: int) -> Optional[dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Захватывает одну задачу из очереди с учётом блокировок и выставляет running.
|
Захватывает одну задачу из очереди с учётом блокировок и выставляет running.
|
||||||
"""
|
"""
|
||||||
|
|
@ -225,7 +225,7 @@ class QueueRepository:
|
||||||
ok = await self._try_advisory_lock(job.lock_key)
|
ok = await self._try_advisory_lock(job.lock_key)
|
||||||
if not ok:
|
if not ok:
|
||||||
job.status = "queued"
|
job.status = "queued"
|
||||||
job.available_at = datetime.now(timezone.utc) + timedelta(seconds=15)
|
job.available_at = datetime.now(timezone.utc) + timedelta(seconds=claim_backoff_sec)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
await self._append_event(job.job_id, job.queue, "picked", {"attempt": job.attempt})
|
await self._append_event(job.job_id, job.queue, "picked", {"attempt": job.attempt})
|
||||||
|
|
@ -241,10 +241,16 @@ class QueueRepository:
|
||||||
"attempt": int(job.attempt),
|
"attempt": int(job.attempt),
|
||||||
}
|
}
|
||||||
|
|
||||||
async def heartbeat(self, job_id: str, ttl_sec: int) -> None:
|
async def heartbeat(self, job_id: str, ttl_sec: int) -> tuple[bool, bool]:
|
||||||
"""
|
"""
|
||||||
Обновляет heartbeat и продлевает lease.
|
Обновляет heartbeat и продлевает lease.
|
||||||
|
Возвращает (success, cancel_requested).
|
||||||
"""
|
"""
|
||||||
|
job = await self._get(job_id)
|
||||||
|
if not job or job.status != "running":
|
||||||
|
return False, False
|
||||||
|
|
||||||
|
cancel_requested = bool(job.cancel_requested)
|
||||||
now = datetime.now(timezone.utc)
|
now = datetime.now(timezone.utc)
|
||||||
q = (
|
q = (
|
||||||
update(DLJob)
|
update(DLJob)
|
||||||
|
|
@ -254,6 +260,7 @@ class QueueRepository:
|
||||||
await self.s.execute(q)
|
await self.s.execute(q)
|
||||||
await self._append_event(job_id, await self._resolve_queue(job_id), "heartbeat", {"ttl": ttl_sec})
|
await self._append_event(job_id, await self._resolve_queue(job_id), "heartbeat", {"ttl": ttl_sec})
|
||||||
await self.s.commit()
|
await self.s.commit()
|
||||||
|
return True, cancel_requested
|
||||||
|
|
||||||
async def finish_ok(self, job_id: str) -> None:
|
async def finish_ok(self, job_id: str) -> None:
|
||||||
"""
|
"""
|
||||||
|
|
@ -269,13 +276,21 @@ class QueueRepository:
|
||||||
await self._advisory_unlock(job.lock_key)
|
await self._advisory_unlock(job.lock_key)
|
||||||
await self.s.commit()
|
await self.s.commit()
|
||||||
|
|
||||||
async def finish_fail_or_retry(self, job_id: str, err: str) -> None:
|
async def finish_fail_or_retry(self, job_id: str, err: str, is_canceled: bool = False) -> None:
|
||||||
"""
|
"""
|
||||||
Помечает задачу как failed или возвращает в очередь с задержкой.
|
Помечает задачу как failed, canceled или возвращает в очередь с задержкой.
|
||||||
"""
|
"""
|
||||||
job = await self._get(job_id)
|
job = await self._get(job_id)
|
||||||
if not job:
|
if not job:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if is_canceled:
|
||||||
|
job.status = "canceled"
|
||||||
|
job.error = err
|
||||||
|
job.finished_at = datetime.now(timezone.utc)
|
||||||
|
job.lease_expires_at = None
|
||||||
|
await self._append_event(job_id, job.queue, "canceled", {"error": err})
|
||||||
|
else:
|
||||||
can_retry = int(job.attempt) < int(job.max_attempts)
|
can_retry = int(job.attempt) < int(job.max_attempts)
|
||||||
if can_retry:
|
if can_retry:
|
||||||
job.status = "queued"
|
job.status = "queued"
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ from typing import AsyncIterator, Callable, Optional
|
||||||
from dataloader.context import APP_CTX
|
from dataloader.context import APP_CTX
|
||||||
from dataloader.storage.db import get_sessionmaker
|
from dataloader.storage.db import get_sessionmaker
|
||||||
from dataloader.storage.repositories import QueueRepository
|
from dataloader.storage.repositories import QueueRepository
|
||||||
|
from dataloader.storage.notify_listener import PGNotifyListener
|
||||||
from dataloader.workers.pipelines.registry import resolve as resolve_pipeline
|
from dataloader.workers.pipelines.registry import resolve as resolve_pipeline
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -32,22 +33,60 @@ class PGWorker:
|
||||||
self._stop = stop_event
|
self._stop = stop_event
|
||||||
self._log = APP_CTX.get_logger()
|
self._log = APP_CTX.get_logger()
|
||||||
self._sm = get_sessionmaker()
|
self._sm = get_sessionmaker()
|
||||||
|
self._notify_wakeup = asyncio.Event()
|
||||||
|
self._listener: Optional[PGNotifyListener] = None
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
"""
|
"""
|
||||||
Главный цикл: ожидание → claim → исполнение → завершение.
|
Главный цикл: ожидание → claim → исполнение → завершение.
|
||||||
"""
|
"""
|
||||||
self._log.info(f"worker.start queue={self._cfg.queue}")
|
self._log.info(f"worker.start queue={self._cfg.queue}")
|
||||||
|
|
||||||
|
# Запускаем LISTEN/NOTIFY
|
||||||
|
self._listener = PGNotifyListener(
|
||||||
|
self._cfg.queue,
|
||||||
|
lambda: self._notify_wakeup.set(),
|
||||||
|
self._stop
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
await self._listener.start()
|
||||||
|
except Exception as e:
|
||||||
|
self._log.warning(f"Failed to start LISTEN/NOTIFY, falling back to polling: {e}")
|
||||||
|
self._listener = None
|
||||||
|
|
||||||
|
try:
|
||||||
while not self._stop.is_set():
|
while not self._stop.is_set():
|
||||||
claimed = await self._claim_and_execute_once()
|
claimed = await self._claim_and_execute_once()
|
||||||
if not claimed:
|
if not claimed:
|
||||||
await self._listen_or_sleep(self._cfg.claim_backoff_sec)
|
await self._listen_or_sleep(self._cfg.claim_backoff_sec)
|
||||||
|
finally:
|
||||||
|
if self._listener:
|
||||||
|
await self._listener.stop()
|
||||||
self._log.info(f"worker.stop queue={self._cfg.queue}")
|
self._log.info(f"worker.stop queue={self._cfg.queue}")
|
||||||
|
|
||||||
async def _listen_or_sleep(self, timeout_sec: int) -> None:
|
async def _listen_or_sleep(self, timeout_sec: int) -> None:
|
||||||
"""
|
"""
|
||||||
Ожидание появления задач с тайм-аутом.
|
Ожидание появления задач через LISTEN/NOTIFY или с тайм-аутом.
|
||||||
"""
|
"""
|
||||||
|
if self._listener:
|
||||||
|
# Используем LISTEN/NOTIFY с fallback на таймаут
|
||||||
|
done, pending = await asyncio.wait(
|
||||||
|
[asyncio.create_task(self._notify_wakeup.wait()), asyncio.create_task(self._stop.wait())],
|
||||||
|
return_when=asyncio.FIRST_COMPLETED,
|
||||||
|
timeout=timeout_sec
|
||||||
|
)
|
||||||
|
# Отменяем оставшиеся задачи
|
||||||
|
for task in pending:
|
||||||
|
task.cancel()
|
||||||
|
try:
|
||||||
|
await task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
# Очищаем событие, если оно было установлено
|
||||||
|
if self._notify_wakeup.is_set():
|
||||||
|
self._notify_wakeup.clear()
|
||||||
|
else:
|
||||||
|
# Fallback на простой таймаут
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(self._stop.wait(), timeout=timeout_sec)
|
await asyncio.wait_for(self._stop.wait(), timeout=timeout_sec)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
|
|
@ -60,7 +99,7 @@ class PGWorker:
|
||||||
async with AsyncExitStack() as stack:
|
async with AsyncExitStack() as stack:
|
||||||
s = await stack.enter_async_context(self._sm())
|
s = await stack.enter_async_context(self._sm())
|
||||||
repo = QueueRepository(s)
|
repo = QueueRepository(s)
|
||||||
row = await repo.claim_one(self._cfg.queue)
|
row = await repo.claim_one(self._cfg.queue, self._cfg.claim_backoff_sec)
|
||||||
if not row:
|
if not row:
|
||||||
await s.commit()
|
await s.commit()
|
||||||
return False
|
return False
|
||||||
|
|
@ -71,28 +110,36 @@ class PGWorker:
|
||||||
args = row["args"]
|
args = row["args"]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self._execute_with_heartbeat(job_id, ttl, self._pipeline(task, args))
|
canceled = await self._execute_with_heartbeat(job_id, ttl, self._pipeline(task, args))
|
||||||
|
if canceled:
|
||||||
|
await repo.finish_fail_or_retry(job_id, "canceled by user", is_canceled=True)
|
||||||
|
else:
|
||||||
await repo.finish_ok(job_id)
|
await repo.finish_ok(job_id)
|
||||||
return True
|
return True
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
await repo.finish_fail_or_retry(job_id, "cancelled")
|
await repo.finish_fail_or_retry(job_id, "cancelled by shutdown", is_canceled=True)
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
await repo.finish_fail_or_retry(job_id, str(e))
|
await repo.finish_fail_or_retry(job_id, str(e))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def _execute_with_heartbeat(self, job_id: str, ttl: int, it: AsyncIterator[None]) -> None:
|
async def _execute_with_heartbeat(self, job_id: str, ttl: int, it: AsyncIterator[None]) -> bool:
|
||||||
"""
|
"""
|
||||||
Исполняет конвейер с поддержкой heartbeat.
|
Исполняет конвейер с поддержкой heartbeat.
|
||||||
|
Возвращает True, если задача была отменена (cancel_requested).
|
||||||
"""
|
"""
|
||||||
next_hb = datetime.now(timezone.utc) + timedelta(seconds=self._cfg.heartbeat_sec)
|
next_hb = datetime.now(timezone.utc) + timedelta(seconds=self._cfg.heartbeat_sec)
|
||||||
async for _ in it:
|
async for _ in it:
|
||||||
if datetime.now(timezone.utc) >= next_hb:
|
now = datetime.now(timezone.utc)
|
||||||
|
if now >= next_hb:
|
||||||
async with self._sm() as s_hb:
|
async with self._sm() as s_hb:
|
||||||
await QueueRepository(s_hb).heartbeat(job_id, ttl)
|
success, cancel_requested = await QueueRepository(s_hb).heartbeat(job_id, ttl)
|
||||||
next_hb = datetime.now(timezone.utc) + timedelta(seconds=self._cfg.heartbeat_sec)
|
if cancel_requested:
|
||||||
|
return True
|
||||||
|
next_hb = now + timedelta(seconds=self._cfg.heartbeat_sec)
|
||||||
if self._stop.is_set():
|
if self._stop.is_set():
|
||||||
raise asyncio.CancelledError()
|
raise asyncio.CancelledError()
|
||||||
|
return False
|
||||||
|
|
||||||
async def _pipeline(self, task: str, args: dict) -> AsyncIterator[None]:
|
async def _pipeline(self, task: str, args: dict) -> AsyncIterator[None]:
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue