# tests/integration_tests/test_queue_repository.py from __future__ import annotations from datetime import datetime, timezone import pytest from sqlalchemy.ext.asyncio import AsyncSession from dataloader.storage.repositories import QueueRepository from dataloader.storage.schemas import CreateJobRequest, JobStatus @pytest.mark.integration class TestQueueRepository: """ Интеграционные тесты для QueueRepository. """ async def test_create_or_get_creates_new_job( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест создания новой задачи в очереди. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={"param": "value"}, idempotency_key="test_key_1", lock_key="test_lock_1", partition_key="part1", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer="test_producer", consumer_group="test_group", ) created_id, status = await repo.create_or_get(req) assert created_id == job_id assert status == "queued" async def test_create_or_get_returns_existing_job( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест идемпотентности - повторный вызов возвращает существующую задачу. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key="idempotent_key_1", lock_key="lock1", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer=None, consumer_group=None, ) created_id_1, status_1 = await repo.create_or_get(req) req_2 = CreateJobRequest( job_id="different_job_id", queue="different_queue", task="different_task", args={}, idempotency_key="idempotent_key_1", lock_key="lock2", partition_key="", priority=200, available_at=datetime.now(timezone.utc), max_attempts=3, lease_ttl_sec=30, producer=None, consumer_group=None, ) created_id_2, status_2 = await repo.create_or_get(req_2) assert created_id_1 == created_id_2 == job_id assert status_1 == status_2 == "queued" async def test_get_status_returns_job_status( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест получения статуса задачи. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={"key": "val"}, idempotency_key=None, lock_key="lock", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) status = await repo.get_status(job_id) assert status is not None assert status.job_id == job_id assert status.status == "queued" assert status.attempt == 0 assert status.error is None async def test_get_status_returns_none_for_nonexistent_job( self, db_session: AsyncSession, clean_queue_tables, ): """ Тест получения статуса несуществующей задачи. """ repo = QueueRepository(db_session) status = await repo.get_status("00000000-0000-0000-0000-000000000000") assert status is None async def test_cancel_sets_cancel_requested_flag( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест установки флага отмены. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) result = await repo.cancel(job_id) assert result is True status = await repo.get_status(job_id) assert status is not None async def test_claim_one_returns_job_for_processing( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест захвата задачи для обработки. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={"data": "test"}, idempotency_key=None, lock_key="lock_claim", partition_key="partition1", priority=50, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=120, producer=None, consumer_group=None, ) await repo.create_or_get(req) claimed = await repo.claim_one(queue_name, claim_backoff_sec=15) assert claimed is not None assert claimed["job_id"] == job_id assert claimed["queue"] == queue_name assert claimed["task"] == task_name assert claimed["args"] == {"data": "test"} assert claimed["lock_key"] == "lock_claim" assert claimed["attempt"] == 1 status = await repo.get_status(job_id) assert status is not None assert status.status == "running" assert status.attempt == 1 async def test_claim_one_returns_none_when_no_jobs( self, db_session: AsyncSession, clean_queue_tables, queue_name: str, ): """ Тест захвата при пустой очереди. """ repo = QueueRepository(db_session) claimed = await repo.claim_one(queue_name, claim_backoff_sec=15) assert claimed is None async def test_heartbeat_updates_lease( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест обновления heartbeat и продления lease. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock_hb", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) await repo.claim_one(queue_name, claim_backoff_sec=15) success, cancel_requested = await repo.heartbeat(job_id, ttl_sec=90) assert success is True assert cancel_requested is False async def test_finish_ok_marks_job_succeeded( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест успешного завершения задачи. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock_finish", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) await repo.claim_one(queue_name, claim_backoff_sec=15) await repo.finish_ok(job_id) status = await repo.get_status(job_id) assert status is not None assert status.status == "succeeded" assert status.finished_at is not None async def test_finish_fail_or_retry_requeues_on_retry( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест повторной постановки при ошибке с возможностью retry. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock_retry", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=3, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) await repo.claim_one(queue_name, claim_backoff_sec=15) await repo.finish_fail_or_retry(job_id, err="Test error") status = await repo.get_status(job_id) assert status is not None assert status.status == "queued" assert status.error == "Test error" assert status.attempt == 1 async def test_finish_fail_or_retry_marks_failed_when_max_attempts_reached( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест финальной ошибки при достижении max_attempts. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock_fail", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=1, lease_ttl_sec=60, producer=None, consumer_group=None, ) await repo.create_or_get(req) await repo.claim_one(queue_name, claim_backoff_sec=15) await repo.finish_fail_or_retry(job_id, err="Final error") status = await repo.get_status(job_id) assert status is not None assert status.status == "failed" assert status.error == "Final error" assert status.finished_at is not None async def test_requeue_lost_returns_expired_jobs( self, db_session: AsyncSession, clean_queue_tables, job_id: str, queue_name: str, task_name: str, ): """ Тест reaper - возврат протухших задач в очередь. """ repo = QueueRepository(db_session) req = CreateJobRequest( job_id=job_id, queue=queue_name, task=task_name, args={}, idempotency_key=None, lock_key="lock_lost", partition_key="", priority=100, available_at=datetime.now(timezone.utc), max_attempts=5, lease_ttl_sec=1, producer=None, consumer_group=None, ) await repo.create_or_get(req) await repo.claim_one(queue_name, claim_backoff_sec=15) import asyncio await asyncio.sleep(2) requeued = await repo.requeue_lost() assert job_id in requeued status = await repo.get_status(job_id) assert status is not None assert status.status == "queued"