from __future__ import annotations from datetime import datetime, timedelta, timezone from uuid import uuid4 import pytest from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from dataloader.storage.models import DLJob from dataloader.storage.repositories import QueueRepository from dataloader.storage.schemas import CreateJobRequest @pytest.mark.integration class TestQueueRepository: """ Интеграционные тесты для QueueRepository. """ async def test_create_or_get_creates_new_job( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест создания новой задачи в очереди. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={"param": "value"}, idempotency_key="test_key_1", lock_key="test_lock_1", partition_key="part1", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer="test_producer", consumer_group="test_group", ) created_id, status = await repo.create_or_get(req) assert created_id == job_id assert status == "queued" async def test_create_or_get_returns_existing_job( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест идемпотентности - повторный вызов возвращает существующую задачу. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key="idempotent_key_1", lock_key="lock1", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer=None, consumer_group=None, ) created_id_1, status_1 = await repo.create_or_get(req) req_2 = CreateJobRequest( job_id="different_job_id", queue="different_queue", task="different_task", args={}, idempotency_key="idempotent_key_1", lock_key="lock2", partition_key="", priority=200, available_at=datetime.now(timezone.utc), max_attempts=3, lease_ttl_sec=30, producer=None, consumer_group=None, ) created_id_2, status_2 = await repo.create_or_get(req_2) assert created_id_1 == created_id_2 == job_id assert status_1 == status_2 == "queued" async def test_get_status_returns_job_status( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест получения статуса задачи. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={"key": "val"}, idempotency_key=None, lock_key="lock", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) status = await repo.get_status(job_id) assert status is not None assert status.job_id == job_id assert status.status == "queued" assert status.attempt == 0 assert status.error is None async def test_get_status_returns_none_for_nonexistent_job( self, db_session: AsyncSession, clean_queue_tables, ): """ Тест получения статуса несуществующей задачи. """ repo = QueueRepository(db_session) status = await repo.get_status("00000000-0000-0000-0000-000000000000") assert status is None async def test_cancel_sets_cancel_requested_flag( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест установки флага отмены. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) result = await repo.cancel(job_id) assert result is True status = await repo.get_status(job_id) assert status is not None async def test_claim_one_returns_job_for_processing( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест захвата задачи для обработки. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={"data": "test"}, idempotency_key=None, lock_key="lock_claim", partition_key="partition1", priority=50, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=120, producer=None, consumer_group=None, ) await repo.create_or_get(req) claimed = await repo.claim_one(queue_name, claim_backoff_sec=15) assert claimed is not None assert claimed["job_id"] == job_id assert claimed["queue"] == queue_name assert claimed["task"] == task_name assert claimed["args"] == {"data": "test"} assert claimed["lock_key"] == "lock_claim" assert claimed["attempt"] == 1 status = await repo.get_status(job_id) assert status is not None assert status.status == "running" assert status.attempt == 1 async def test_claim_one_returns_none_when_no_jobs( self, db_session: AsyncSession, clean_queue_tables, queue_name: str, ): """ Тест захвата при пустой очереди. """ repo = QueueRepository(db_session) claimed = await repo.claim_one(queue_name, claim_backoff_sec=15) assert claimed is None async def test_heartbeat_updates_lease( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест обновления heartbeat и продления lease. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock_hb", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) await repo.claim_one(queue_name, claim_backoff_sec=15) success, cancel_requested = await repo.heartbeat(job_id, ttl_sec=90) assert success is True assert cancel_requested is False async def test_finish_ok_marks_job_succeeded( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест успешного завершения задачи. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock_finish", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) await repo.claim_one(queue_name, claim_backoff_sec=15) await repo.finish_ok(job_id) status = await repo.get_status(job_id) assert status is not None assert status.status == "succeeded" assert status.finished_at is not None async def test_finish_fail_or_retry_requeues_on_retry( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест повторной постановки при ошибке с возможностью retry. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock_retry", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=3, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) await repo.claim_one(queue_name, claim_backoff_sec=15) await repo.finish_fail_or_retry(job_id, err="Test error") status = await repo.get_status(job_id) assert status is not None assert status.status == "queued" assert status.error == "Test error" assert status.attempt == 1 async def test_finish_fail_or_retry_marks_failed_when_max_attempts_reached( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест финальной ошибки при достижении max_attempts. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock_fail", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=1, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) await repo.claim_one(queue_name, claim_backoff_sec=15) await repo.finish_fail_or_retry(job_id, err="Final error") status = await repo.get_status(job_id) assert status is not None assert status.status == "failed" assert status.error == "Final error" assert status.finished_at is not None async def test_requeue_lost_returns_expired_jobs( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест reaper - возврат протухших задач в очередь. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock_lost", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=1, producer=None, consumer_group=None, ) await repo.create_or_get(req) await repo.claim_one(queue_name, claim_backoff_sec=15) import asyncio await asyncio.sleep(2) requeued = await repo.requeue_lost() assert job_id in requeued status = await repo.get_status(job_id) assert status is not None assert status.status == "queued" async def test_claim_one_fails_on_advisory_lock_and_sets_backoff( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Проверка ветки отказа advisory-lock: задача возвращается в queued с отложенным available_at. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={"k": "v"}, idempotency_key=None, lock_key="lock-fail-adv", partition_key="", priority=10, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=30, producer=None, consumer_group=None, ) await repo.create_or_get(req) async def _false_lock(_: str) -> bool: return False repo._try_advisory_lock = _false_lock # type: ignore[method-assign] before = datetime.now(timezone.utc) claimed = await repo.claim_one(queue_name, claim_backoff_sec=15) after = datetime.now(timezone.utc) assert claimed is None st = await repo.get_status(job_id) assert st is not None assert st.status == "queued" row = ( await db_session.execute(select(DLJob).where(DLJob.job_id == job_id)) ).scalar_one() assert row.available_at >= before + timedelta(seconds=15) assert row.available_at <= after + timedelta(seconds=60) async def test_heartbeat_when_not_running_returns_false( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Heartbeat для нерunning задачи возвращает (False, False). """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock-hb-not-running", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) ok, cancel = await repo.heartbeat(job_id, ttl_sec=30) assert ok is False assert cancel is False async def test_finish_fail_or_retry_marks_canceled_branch( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Ветка is_canceled=True помечает задачу как canceled и завершает её. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock-cancel", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) await repo.claim_one(queue_name, claim_backoff_sec=5) await repo.finish_fail_or_retry( job_id, err="Canceled by test", is_canceled=True ) st = await repo.get_status(job_id) assert st is not None assert st.status == "canceled" assert st.error == "Canceled by test" assert st.finished_at is not None async def test_requeue_lost_no_expired_returns_empty( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ requeue_lost без протухших задач возвращает пустой список. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock-none-expired", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=120, producer=None, consumer_group=None, ) await repo.create_or_get(req) await repo.claim_one(queue_name, claim_backoff_sec=5) res = await repo.requeue_lost(now=datetime.now(timezone.utc)) assert res == [] st = await repo.get_status(job_id) assert st is not None assert st.status == "running" async def test_private_helpers_resolve_queue_and_advisory_unlock_are_executable( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Прямые прогоны приватных методов для покрытия редких веток. """ repo = QueueRepository(db_session) rq = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock-direct-unlock", partition_key="", priority=1, available_at=datetime.now(timezone.utc), max_attempts=1, lease_ttl_sec=5, producer=None, consumer_group=None, ) await repo.create_or_get(rq) missing_uuid = str(uuid4()) qname = await repo._resolve_queue(missing_uuid) # type: ignore[attr-defined] assert qname == "" await repo._advisory_unlock("lock-direct-unlock") # type: ignore[attr-defined] async def test_cancel_returns_false_for_nonexistent_job( self, db_session: AsyncSession, clean_queue_tables, ): """ Возвращает False при отмене несуществующей задачи. """ repo = QueueRepository(db_session) assert await repo.cancel(str(uuid4())) is False async def test_finish_ok_silent_when_job_absent( self, db_session: AsyncSession, clean_queue_tables, ): """ Тихо завершается, если задача не найдена. """ repo = QueueRepository(db_session) await repo.finish_ok(str(uuid4())) async def test_finish_fail_or_retry_noop_when_job_absent( self, db_session: AsyncSession, clean_queue_tables, ): """ Тихо выходит при отсутствии задачи. """ repo = QueueRepository(db_session) await repo.finish_fail_or_retry(str(uuid4()), err="no-op")