From fbcdbac6a048d690d69e6c2dd578b0dc9831bdef Mon Sep 17 00:00:00 2001 From: itqop Date: Wed, 5 Nov 2025 02:24:27 +0300 Subject: [PATCH] fix: try to fix --- src/dataloader/context.py | 13 ++++++++++ src/dataloader/storage/repositories.py | 35 +++++++++++++------------- src/dataloader/workers/manager.py | 14 +++++------ src/dataloader/workers/reaper.py | 15 +++++++++++ 4 files changed, 52 insertions(+), 25 deletions(-) create mode 100644 src/dataloader/workers/reaper.py diff --git a/src/dataloader/context.py b/src/dataloader/context.py index 9bd913d..1d8d402 100644 --- a/src/dataloader/context.py +++ b/src/dataloader/context.py @@ -8,6 +8,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker from dataloader.base import Singleton from dataloader.config import APP_CONFIG, Secrets from dataloader.logger import ContextVarsContainer, LoggerConfigurator +from sqlalchemy import event class AppContext(metaclass=Singleton): @@ -86,6 +87,18 @@ class AppContext(metaclass=Singleton): max_overflow=self.pg.max_overflow if self.pg.use_pool else 0, pool_recycle=self.pg.pool_recycle if self.pg.use_pool else -1, ) + + schema = self.pg.schema_.strip() + + @event.listens_for(self._engine.sync_engine, "connect") + def _set_search_path(dbapi_conn, _): + cur = dbapi_conn.cursor() + try: + if schema: + cur.execute(f'SET SESSION search_path TO "{schema}"') + finally: + cur.close() + self._sessionmaker = async_sessionmaker(self._engine, expire_on_commit=False, class_=AsyncSession) self.logger.info("All connections checked. Application is up and ready.") diff --git a/src/dataloader/storage/repositories.py b/src/dataloader/storage/repositories.py index 1ba963c..7e87de2 100644 --- a/src/dataloader/storage/repositories.py +++ b/src/dataloader/storage/repositories.py @@ -295,24 +295,25 @@ class QueueRepository: Возвращает протухшие running-задачи в очередь. """ now = now or datetime.now(timezone.utc) - q = ( - select(DLJob) - .where( - DLJob.status == "running", - DLJob.lease_expires_at.is_not(None), - DLJob.lease_expires_at < now, + async with self.s.begin(): + q = ( + select(DLJob) + .where( + DLJob.status == "running", + DLJob.lease_expires_at.is_not(None), + DLJob.lease_expires_at < now, + ) + .with_for_update(skip_locked=True) ) - .with_for_update(skip_locked=True) - ) - r = await self.s.execute(q) - rows = list(r.scalars().all()) - ids: list[str] = [] - for job in rows: - job.status = "queued" - job.available_at = now - job.lease_expires_at = None - ids.append(job.job_id) - await self._append_event(job.job_id, job.queue, "requeue_lost", None) + r = await self.s.execute(q) + rows = list(r.scalars().all()) + ids: list[str] = [] + for job in rows: + job.status = "queued" + job.available_at = now + job.lease_expires_at = None + ids.append(job.job_id) + await self._append_event(job.job_id, job.queue, "requeue_lost", None) await self.s.commit() return ids diff --git a/src/dataloader/workers/manager.py b/src/dataloader/workers/manager.py index 802a403..6678826 100644 --- a/src/dataloader/workers/manager.py +++ b/src/dataloader/workers/manager.py @@ -4,13 +4,12 @@ from __future__ import annotations import asyncio import contextlib from dataclasses import dataclass -from typing import Any from dataloader.context import APP_CTX from dataloader.config import APP_CONFIG from dataloader.storage.db import get_sessionmaker -from dataloader.storage.repositories import QueueRepository from dataloader.workers.base import PGWorker, WorkerConfig +from dataloader.workers.reaper import requeue_lost @dataclass(frozen=True) @@ -66,7 +65,7 @@ class WorkerManager: if self._reaper_task: self._reaper_task.cancel() - with contextlib.suppress(Exception): + with contextlib.suppress(asyncio.CancelledError, Exception): await self._reaper_task self._reaper_task = None @@ -81,12 +80,11 @@ class WorkerManager: while not self._stop.is_set(): try: async with sm() as s: - repo = QueueRepository(s) - ids = await repo.requeue_lost() + ids = await requeue_lost(s) if ids: - APP_CTX.get_logger().info("reaper.requeued", extra={"count": len(ids)}) - except Exception as e: - APP_CTX.get_logger().error("reaper.error", extra={"error": str(e)}) + self._log.info("reaper.requeued", extra={"count": len(ids)}) + except Exception: + self._log.exception("reaper.error") try: await asyncio.wait_for(self._stop.wait(), timeout=period) except asyncio.TimeoutError: diff --git a/src/dataloader/workers/reaper.py b/src/dataloader/workers/reaper.py new file mode 100644 index 0000000..0b72ddd --- /dev/null +++ b/src/dataloader/workers/reaper.py @@ -0,0 +1,15 @@ +# src/dataloader/workers/reaper.py +from __future__ import annotations + +from typing import Sequence +from sqlalchemy.ext.asyncio import AsyncSession + +from dataloader.storage.repositories import QueueRepository + + +async def requeue_lost(session: AsyncSession) -> Sequence[str]: + """ + Возвращает протухшие running-задачи в очередь и отдаёт их job_id. + """ + repo = QueueRepository(session) + return await repo.requeue_lost()