diff --git a/src/dataloader/storage/models/queue.py b/src/dataloader/storage/models/queue.py index 54aca7b..8da17f9 100644 --- a/src/dataloader/storage/models/queue.py +++ b/src/dataloader/storage/models/queue.py @@ -52,7 +52,7 @@ class DLJob(Base): error: Mapped[Optional[str]] = mapped_column(Text) producer: Mapped[Optional[str]] = mapped_column(Text) consumer_group: Mapped[Optional[str]] = mapped_column(Text) - created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + load_dttm: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) started_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) @@ -68,6 +68,6 @@ class DLJobEvent(Base): event_id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) job_id: Mapped[str] = mapped_column(UUID(as_uuid=False), nullable=False) queue: Mapped[str] = mapped_column(Text, nullable=False) - ts: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + load_dttm: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) kind: Mapped[str] = mapped_column(Text, nullable=False) payload: Mapped[Optional[dict[str, Any]]] = mapped_column(JSONB) diff --git a/src/dataloader/storage/repositories/queue.py b/src/dataloader/storage/repositories/queue.py index 98f0efd..2b3e0da 100644 --- a/src/dataloader/storage/repositories/queue.py +++ b/src/dataloader/storage/repositories/queue.py @@ -57,7 +57,7 @@ class QueueRepository: error=None, producer=req.producer, consumer_group=req.consumer_group, - created_at=datetime.now(timezone.utc), + load_dttm=datetime.now(timezone.utc), started_at=None, finished_at=None, ) @@ -137,7 +137,7 @@ class QueueRepository: DLJob.queue == queue, DLJob.available_at <= func.now(), ) - .order_by(DLJob.priority.asc(), DLJob.created_at.asc()) + .order_by(DLJob.priority.asc(), DLJob.load_dttm.asc()) .with_for_update(skip_locked=True) .limit(1) ) @@ -323,7 +323,7 @@ class QueueRepository: ev = DLJobEvent( job_id=job_id, queue=queue or "", - ts=datetime.now(timezone.utc), + load_dttm=datetime.now(timezone.utc), kind=kind, payload=payload or None, )