From 1789270d1777a65a090ce9305071bf093218659f Mon Sep 17 00:00:00 2001 From: itqop Date: Wed, 5 Nov 2025 02:34:15 +0300 Subject: [PATCH] fix: fixes --- src/dataloader/context.py | 7 ++++++- src/dataloader/storage/repositories.py | 23 +++++++++++------------ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/dataloader/context.py b/src/dataloader/context.py index 1d8d402..6f03580 100644 --- a/src/dataloader/context.py +++ b/src/dataloader/context.py @@ -8,7 +8,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker from dataloader.base import Singleton from dataloader.config import APP_CONFIG, Secrets from dataloader.logger import ContextVarsContainer, LoggerConfigurator -from sqlalchemy import event +from sqlalchemy import event, select, func, text class AppContext(metaclass=Singleton): @@ -100,6 +100,11 @@ class AppContext(metaclass=Singleton): cur.close() self._sessionmaker = async_sessionmaker(self._engine, expire_on_commit=False, class_=AsyncSession) + + async with self._sessionmaker() as s: + await s.execute(select(func.count()).select_from(text("dl_jobs"))) + await s.execute(select(func.count()).select_from(text("dl_job_events"))) + self.logger.info("All connections checked. Application is up and ready.") async def on_shutdown(self) -> None: diff --git a/src/dataloader/storage/repositories.py b/src/dataloader/storage/repositories.py index 7e87de2..2bd86be 100644 --- a/src/dataloader/storage/repositories.py +++ b/src/dataloader/storage/repositories.py @@ -5,8 +5,8 @@ from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Any, Optional -from sqlalchemy import BigInteger, String, Text, select, func, update -from sqlalchemy.dialects.postgresql import JSONB, ENUM +from sqlalchemy import BigInteger, String, Text, select, func, update, DateTime +from sqlalchemy.dialects.postgresql import JSONB, ENUM, UUID from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Mapped, mapped_column @@ -32,7 +32,7 @@ class DLJob(Base): """ __tablename__ = "dl_jobs" - job_id: Mapped[str] = mapped_column(String(36), primary_key=True) + job_id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True) queue: Mapped[str] = mapped_column(Text, nullable=False) task: Mapped[str] = mapped_column(Text, nullable=False) args: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False) @@ -40,21 +40,21 @@ class DLJob(Base): lock_key: Mapped[str] = mapped_column(Text, nullable=False) partition_key: Mapped[str] = mapped_column(Text, default="", nullable=False) priority: Mapped[int] = mapped_column(nullable=False, default=100) - available_at: Mapped[datetime] = mapped_column(nullable=False) + available_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) status: Mapped[str] = mapped_column(dl_status_enum, nullable=False, default="queued") attempt: Mapped[int] = mapped_column(nullable=False, default=0) max_attempts: Mapped[int] = mapped_column(nullable=False, default=5) lease_ttl_sec: Mapped[int] = mapped_column(nullable=False, default=60) - lease_expires_at: Mapped[Optional[datetime]] = mapped_column(nullable=True) - heartbeat_at: Mapped[Optional[datetime]] = mapped_column(nullable=True) + lease_expires_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) + heartbeat_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) cancel_requested: Mapped[bool] = mapped_column(nullable=False, default=False) progress: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False) error: Mapped[Optional[str]] = mapped_column(Text) producer: Mapped[Optional[str]] = mapped_column(Text) consumer_group: Mapped[Optional[str]] = mapped_column(Text) - created_at: Mapped[datetime] = mapped_column(nullable=False) - started_at: Mapped[Optional[datetime]] = mapped_column(nullable=True) - finished_at: Mapped[Optional[datetime]] = mapped_column(nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + started_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) + finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) class DLJobEvent(Base): @@ -64,9 +64,9 @@ class DLJobEvent(Base): __tablename__ = "dl_job_events" event_id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) - job_id: Mapped[str] = mapped_column(String(36), nullable=False) + job_id: Mapped[str] = mapped_column(UUID(as_uuid=False), nullable=False) queue: Mapped[str] = mapped_column(Text, nullable=False) - ts: Mapped[datetime] = mapped_column(nullable=False) + ts: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) kind: Mapped[str] = mapped_column(Text, nullable=False) payload: Mapped[Optional[dict[str, Any]]] = mapped_column(JSONB) @@ -314,7 +314,6 @@ class QueueRepository: job.lease_expires_at = None ids.append(job.job_id) await self._append_event(job.job_id, job.queue, "requeue_lost", None) - await self.s.commit() return ids async def _get(self, job_id: str) -> Optional[DLJob]: