fix: try to fix

This commit is contained in:
itqop 2025-11-05 02:24:27 +03:00
parent ad12343784
commit fbcdbac6a0
4 changed files with 52 additions and 25 deletions

View File

@ -8,6 +8,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
from dataloader.base import Singleton from dataloader.base import Singleton
from dataloader.config import APP_CONFIG, Secrets from dataloader.config import APP_CONFIG, Secrets
from dataloader.logger import ContextVarsContainer, LoggerConfigurator from dataloader.logger import ContextVarsContainer, LoggerConfigurator
from sqlalchemy import event
class AppContext(metaclass=Singleton): class AppContext(metaclass=Singleton):
@ -86,6 +87,18 @@ class AppContext(metaclass=Singleton):
max_overflow=self.pg.max_overflow if self.pg.use_pool else 0, max_overflow=self.pg.max_overflow if self.pg.use_pool else 0,
pool_recycle=self.pg.pool_recycle if self.pg.use_pool else -1, pool_recycle=self.pg.pool_recycle if self.pg.use_pool else -1,
) )
schema = self.pg.schema_.strip()
@event.listens_for(self._engine.sync_engine, "connect")
def _set_search_path(dbapi_conn, _):
cur = dbapi_conn.cursor()
try:
if schema:
cur.execute(f'SET SESSION search_path TO "{schema}"')
finally:
cur.close()
self._sessionmaker = async_sessionmaker(self._engine, expire_on_commit=False, class_=AsyncSession) self._sessionmaker = async_sessionmaker(self._engine, expire_on_commit=False, class_=AsyncSession)
self.logger.info("All connections checked. Application is up and ready.") self.logger.info("All connections checked. Application is up and ready.")

View File

@ -295,24 +295,25 @@ class QueueRepository:
Возвращает протухшие running-задачи в очередь. Возвращает протухшие running-задачи в очередь.
""" """
now = now or datetime.now(timezone.utc) now = now or datetime.now(timezone.utc)
q = ( async with self.s.begin():
select(DLJob) q = (
.where( select(DLJob)
DLJob.status == "running", .where(
DLJob.lease_expires_at.is_not(None), DLJob.status == "running",
DLJob.lease_expires_at < now, DLJob.lease_expires_at.is_not(None),
DLJob.lease_expires_at < now,
)
.with_for_update(skip_locked=True)
) )
.with_for_update(skip_locked=True) r = await self.s.execute(q)
) rows = list(r.scalars().all())
r = await self.s.execute(q) ids: list[str] = []
rows = list(r.scalars().all()) for job in rows:
ids: list[str] = [] job.status = "queued"
for job in rows: job.available_at = now
job.status = "queued" job.lease_expires_at = None
job.available_at = now ids.append(job.job_id)
job.lease_expires_at = None await self._append_event(job.job_id, job.queue, "requeue_lost", None)
ids.append(job.job_id)
await self._append_event(job.job_id, job.queue, "requeue_lost", None)
await self.s.commit() await self.s.commit()
return ids return ids

View File

@ -4,13 +4,12 @@ from __future__ import annotations
import asyncio import asyncio
import contextlib import contextlib
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any
from dataloader.context import APP_CTX from dataloader.context import APP_CTX
from dataloader.config import APP_CONFIG from dataloader.config import APP_CONFIG
from dataloader.storage.db import get_sessionmaker from dataloader.storage.db import get_sessionmaker
from dataloader.storage.repositories import QueueRepository
from dataloader.workers.base import PGWorker, WorkerConfig from dataloader.workers.base import PGWorker, WorkerConfig
from dataloader.workers.reaper import requeue_lost
@dataclass(frozen=True) @dataclass(frozen=True)
@ -66,7 +65,7 @@ class WorkerManager:
if self._reaper_task: if self._reaper_task:
self._reaper_task.cancel() self._reaper_task.cancel()
with contextlib.suppress(Exception): with contextlib.suppress(asyncio.CancelledError, Exception):
await self._reaper_task await self._reaper_task
self._reaper_task = None self._reaper_task = None
@ -81,12 +80,11 @@ class WorkerManager:
while not self._stop.is_set(): while not self._stop.is_set():
try: try:
async with sm() as s: async with sm() as s:
repo = QueueRepository(s) ids = await requeue_lost(s)
ids = await repo.requeue_lost()
if ids: if ids:
APP_CTX.get_logger().info("reaper.requeued", extra={"count": len(ids)}) self._log.info("reaper.requeued", extra={"count": len(ids)})
except Exception as e: except Exception:
APP_CTX.get_logger().error("reaper.error", extra={"error": str(e)}) self._log.exception("reaper.error")
try: try:
await asyncio.wait_for(self._stop.wait(), timeout=period) await asyncio.wait_for(self._stop.wait(), timeout=period)
except asyncio.TimeoutError: except asyncio.TimeoutError:

View File

@ -0,0 +1,15 @@
# src/dataloader/workers/reaper.py
from __future__ import annotations
from typing import Sequence
from sqlalchemy.ext.asyncio import AsyncSession
from dataloader.storage.repositories import QueueRepository
async def requeue_lost(session: AsyncSession) -> Sequence[str]:
"""
Возвращает протухшие running-задачи в очередь и отдаёт их job_id.
"""
repo = QueueRepository(session)
return await repo.requeue_lost()