dataloader/tests/integration_tests/test_queue_repository.py

686 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# tests/integration_tests/test_queue_repository.py
from __future__ import annotations
from datetime import datetime, timezone, timedelta
from uuid import uuid4
import pytest
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from dataloader.storage.models import DLJob
from dataloader.storage.repositories import QueueRepository
from dataloader.storage.schemas import CreateJobRequest, JobStatus
@pytest.mark.integration
class TestQueueRepository:
"""
Интеграционные тесты для QueueRepository.
"""
async def test_create_or_get_creates_new_job(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест создания новой задачи в очереди.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={"param": "value"},
idempotency_key="test_key_1",
lock_key="test_lock_1",
partition_key="part1",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer="test_producer",
consumer_group="test_group",
)
created_id, status = await repo.create_or_get(req)
assert created_id == job_id
assert status == "queued"
async def test_create_or_get_returns_existing_job(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест идемпотентности - повторный вызов возвращает существующую задачу.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key="idempotent_key_1",
lock_key="lock1",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
created_id_1, status_1 = await repo.create_or_get(req)
req_2 = CreateJobRequest(
job_id="different_job_id",
queue="different_queue",
task="different_task",
args={},
idempotency_key="idempotent_key_1",
lock_key="lock2",
partition_key="",
priority=200,
available_at=datetime.now(timezone.utc),
max_attempts=3,
lease_ttl_sec=30,
producer=None,
consumer_group=None,
)
created_id_2, status_2 = await repo.create_or_get(req_2)
assert created_id_1 == created_id_2 == job_id
assert status_1 == status_2 == "queued"
async def test_get_status_returns_job_status(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест получения статуса задачи.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={"key": "val"},
idempotency_key=None,
lock_key="lock",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
status = await repo.get_status(job_id)
assert status is not None
assert status.job_id == job_id
assert status.status == "queued"
assert status.attempt == 0
assert status.error is None
async def test_get_status_returns_none_for_nonexistent_job(
self,
db_session: AsyncSession,
clean_queue_tables,
):
"""
Тест получения статуса несуществующей задачи.
"""
repo = QueueRepository(db_session)
status = await repo.get_status("00000000-0000-0000-0000-000000000000")
assert status is None
async def test_cancel_sets_cancel_requested_flag(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест установки флага отмены.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
result = await repo.cancel(job_id)
assert result is True
status = await repo.get_status(job_id)
assert status is not None
async def test_claim_one_returns_job_for_processing(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест захвата задачи для обработки.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={"data": "test"},
idempotency_key=None,
lock_key="lock_claim",
partition_key="partition1",
priority=50,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=120,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
claimed = await repo.claim_one(queue_name, claim_backoff_sec=15)
assert claimed is not None
assert claimed["job_id"] == job_id
assert claimed["queue"] == queue_name
assert claimed["task"] == task_name
assert claimed["args"] == {"data": "test"}
assert claimed["lock_key"] == "lock_claim"
assert claimed["attempt"] == 1
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "running"
assert status.attempt == 1
async def test_claim_one_returns_none_when_no_jobs(
self,
db_session: AsyncSession,
clean_queue_tables,
queue_name: str,
):
"""
Тест захвата при пустой очереди.
"""
repo = QueueRepository(db_session)
claimed = await repo.claim_one(queue_name, claim_backoff_sec=15)
assert claimed is None
async def test_heartbeat_updates_lease(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест обновления heartbeat и продления lease.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_hb",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
success, cancel_requested = await repo.heartbeat(job_id, ttl_sec=90)
assert success is True
assert cancel_requested is False
async def test_finish_ok_marks_job_succeeded(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест успешного завершения задачи.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_finish",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
await repo.finish_ok(job_id)
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "succeeded"
assert status.finished_at is not None
async def test_finish_fail_or_retry_requeues_on_retry(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест повторной постановки при ошибке с возможностью retry.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_retry",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=3,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
await repo.finish_fail_or_retry(job_id, err="Test error")
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "queued"
assert status.error == "Test error"
assert status.attempt == 1
async def test_finish_fail_or_retry_marks_failed_when_max_attempts_reached(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест финальной ошибки при достижении max_attempts.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_fail",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=1,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
await repo.finish_fail_or_retry(job_id, err="Final error")
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "failed"
assert status.error == "Final error"
assert status.finished_at is not None
async def test_requeue_lost_returns_expired_jobs(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест reaper - возврат протухших задач в очередь.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_lost",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=1,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
import asyncio
await asyncio.sleep(2)
requeued = await repo.requeue_lost()
assert job_id in requeued
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "queued"
async def test_claim_one_fails_on_advisory_lock_and_sets_backoff(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Проверка ветки отказа advisory-lock: задача возвращается в queued с отложенным available_at.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={"k": "v"},
idempotency_key=None,
lock_key="lock-fail-adv",
partition_key="",
priority=10,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=30,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
async def _false_lock(_: str) -> bool:
return False
repo._try_advisory_lock = _false_lock # type: ignore[method-assign]
before = datetime.now(timezone.utc)
claimed = await repo.claim_one(queue_name, claim_backoff_sec=15)
after = datetime.now(timezone.utc)
assert claimed is None
st = await repo.get_status(job_id)
assert st is not None
assert st.status == "queued"
row = (await db_session.execute(select(DLJob).where(DLJob.job_id == job_id))).scalar_one()
assert row.available_at >= before + timedelta(seconds=15)
assert row.available_at <= after + timedelta(seconds=60)
async def test_heartbeat_when_not_running_returns_false(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Heartbeat для нерunning задачи возвращает (False, False).
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock-hb-not-running",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
ok, cancel = await repo.heartbeat(job_id, ttl_sec=30)
assert ok is False
assert cancel is False
async def test_finish_fail_or_retry_marks_canceled_branch(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Ветка is_canceled=True помечает задачу как canceled и завершает её.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock-cancel",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=5)
await repo.finish_fail_or_retry(job_id, err="Canceled by test", is_canceled=True)
st = await repo.get_status(job_id)
assert st is not None
assert st.status == "canceled"
assert st.error == "Canceled by test"
assert st.finished_at is not None
async def test_requeue_lost_no_expired_returns_empty(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
requeue_lost без протухших задач возвращает пустой список.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock-none-expired",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=120,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=5)
res = await repo.requeue_lost(now=datetime.now(timezone.utc))
assert res == []
st = await repo.get_status(job_id)
assert st is not None
assert st.status == "running"
async def test_private_helpers_resolve_queue_and_advisory_unlock_are_executable(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Прямые прогоны приватных методов для покрытия редких веток.
"""
repo = QueueRepository(db_session)
rq = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock-direct-unlock",
partition_key="",
priority=1,
available_at=datetime.now(timezone.utc),
max_attempts=1,
lease_ttl_sec=5,
producer=None,
consumer_group=None,
)
await repo.create_or_get(rq)
missing_uuid = str(uuid4())
qname = await repo._resolve_queue(missing_uuid) # type: ignore[attr-defined]
assert qname == ""
await repo._advisory_unlock("lock-direct-unlock") # type: ignore[attr-defined]
async def test_cancel_returns_false_for_nonexistent_job(
self,
db_session: AsyncSession,
clean_queue_tables,
):
"""
Возвращает False при отмене несуществующей задачи.
"""
repo = QueueRepository(db_session)
assert await repo.cancel(str(uuid4())) is False
async def test_finish_ok_silent_when_job_absent(
self,
db_session: AsyncSession,
clean_queue_tables,
):
"""
Тихо завершается, если задача не найдена.
"""
repo = QueueRepository(db_session)
await repo.finish_ok(str(uuid4()))
async def test_finish_fail_or_retry_noop_when_job_absent(
self,
db_session: AsyncSession,
clean_queue_tables,
):
"""
Тихо выходит при отсутствии задачи.
"""
repo = QueueRepository(db_session)
await repo.finish_fail_or_retry(str(uuid4()), err="no-op")