From 309e62c410bba036cdc5814dac633182930c4058 Mon Sep 17 00:00:00 2001 From: itqop Date: Wed, 5 Nov 2025 18:49:12 +0300 Subject: [PATCH] tests: add new tests --- tests/conftest.py | 1 + .../test_queue_repository.py | 234 +++++++++++++++++- tests/unit/test_workers_base.py | 66 +++++ tests/unit/test_workers_reaper.py | 27 ++ 4 files changed, 327 insertions(+), 1 deletion(-) create mode 100644 tests/unit/test_workers_reaper.py diff --git a/tests/conftest.py b/tests/conftest.py index c123c3b..212645d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,6 +24,7 @@ from dataloader.storage.engine import create_engine, create_sessionmaker if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) +pytestmark = pytest.mark.asyncio @pytest_asyncio.fixture(scope="function") async def db_engine() -> AsyncGenerator[AsyncEngine, None]: diff --git a/tests/integration_tests/test_queue_repository.py b/tests/integration_tests/test_queue_repository.py index 917aff6..028755d 100644 --- a/tests/integration_tests/test_queue_repository.py +++ b/tests/integration_tests/test_queue_repository.py @@ -1,11 +1,14 @@ # tests/integration_tests/test_queue_repository.py from __future__ import annotations -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta +from uuid import uuid4 import pytest +from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from dataloader.storage.models import DLJob from dataloader.storage.repositories import QueueRepository from dataloader.storage.schemas import CreateJobRequest, JobStatus @@ -451,3 +454,232 @@ class TestQueueRepository: status = await repo.get_status(job_id) assert status is not None assert status.status == "queued" + + async def test_claim_one_fails_on_advisory_lock_and_sets_backoff( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Проверка ветки отказа advisory-lock: задача возвращается в queued с отложенным available_at. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={"k": "v"}, + idempotency_key=None, + lock_key="lock-fail-adv", + partition_key="", + priority=10, + available_at=datetime.now(timezone.utc), + max_attempts=5, + lease_ttl_sec=30, + producer=None, + consumer_group=None, + ) + await repo.create_or_get(req) + + async def _false_lock(_: str) -> bool: + return False + + repo._try_advisory_lock = _false_lock # type: ignore[method-assign] + + before = datetime.now(timezone.utc) + claimed = await repo.claim_one(queue_name, claim_backoff_sec=15) + after = datetime.now(timezone.utc) + + assert claimed is None + + st = await repo.get_status(job_id) + assert st is not None + assert st.status == "queued" + + row = (await db_session.execute(select(DLJob).where(DLJob.job_id == job_id))).scalar_one() + assert row.available_at >= before + timedelta(seconds=15) + assert row.available_at <= after + timedelta(seconds=60) + + async def test_heartbeat_when_not_running_returns_false( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Heartbeat для нерunning задачи возвращает (False, False). + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={}, + idempotency_key=None, + lock_key="lock-hb-not-running", + partition_key="", + priority=100, + available_at=datetime.now(timezone.utc), + max_attempts=5, + lease_ttl_sec=60, + producer=None, + consumer_group=None, + ) + await repo.create_or_get(req) + + ok, cancel = await repo.heartbeat(job_id, ttl_sec=30) + assert ok is False + assert cancel is False + + async def test_finish_fail_or_retry_marks_canceled_branch( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Ветка is_canceled=True помечает задачу как canceled и завершает её. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={}, + idempotency_key=None, + lock_key="lock-cancel", + partition_key="", + priority=100, + available_at=datetime.now(timezone.utc), + max_attempts=5, + lease_ttl_sec=60, + producer=None, + consumer_group=None, + ) + await repo.create_or_get(req) + await repo.claim_one(queue_name, claim_backoff_sec=5) + + await repo.finish_fail_or_retry(job_id, err="Canceled by test", is_canceled=True) + + st = await repo.get_status(job_id) + assert st is not None + assert st.status == "canceled" + assert st.error == "Canceled by test" + assert st.finished_at is not None + + async def test_requeue_lost_no_expired_returns_empty( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + requeue_lost без протухших задач возвращает пустой список. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={}, + idempotency_key=None, + lock_key="lock-none-expired", + partition_key="", + priority=100, + available_at=datetime.now(timezone.utc), + max_attempts=5, + lease_ttl_sec=120, + producer=None, + consumer_group=None, + ) + await repo.create_or_get(req) + await repo.claim_one(queue_name, claim_backoff_sec=5) + + res = await repo.requeue_lost(now=datetime.now(timezone.utc)) + assert res == [] + + st = await repo.get_status(job_id) + assert st is not None + assert st.status == "running" + + async def test_private_helpers_resolve_queue_and_advisory_unlock_are_executable( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Прямые прогоны приватных методов для покрытия редких веток. + """ + repo = QueueRepository(db_session) + + rq = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={}, + idempotency_key=None, + lock_key="lock-direct-unlock", + partition_key="", + priority=1, + available_at=datetime.now(timezone.utc), + max_attempts=1, + lease_ttl_sec=5, + producer=None, + consumer_group=None, + ) + await repo.create_or_get(rq) + + missing_uuid = str(uuid4()) + qname = await repo._resolve_queue(missing_uuid) # type: ignore[attr-defined] + assert qname == "" + + await repo._advisory_unlock("lock-direct-unlock") # type: ignore[attr-defined] + + async def test_cancel_returns_false_for_nonexistent_job( + self, + db_session: AsyncSession, + clean_queue_tables, + ): + """ + Возвращает False при отмене несуществующей задачи. + """ + repo = QueueRepository(db_session) + assert await repo.cancel(str(uuid4())) is False + + async def test_finish_ok_silent_when_job_absent( + self, + db_session: AsyncSession, + clean_queue_tables, + ): + """ + Тихо завершается, если задача не найдена. + """ + repo = QueueRepository(db_session) + await repo.finish_ok(str(uuid4())) + + async def test_finish_fail_or_retry_noop_when_job_absent( + self, + db_session: AsyncSession, + clean_queue_tables, + ): + """ + Тихо выходит при отсутствии задачи. + """ + repo = QueueRepository(db_session) + await repo.finish_fail_or_retry(str(uuid4()), err="no-op") diff --git a/tests/unit/test_workers_base.py b/tests/unit/test_workers_base.py index 6e8adb5..383c920 100644 --- a/tests/unit/test_workers_base.py +++ b/tests/unit/test_workers_base.py @@ -444,3 +444,69 @@ class TestPGWorker: results.append(_) assert len(results) == 3 + + @pytest.mark.asyncio + async def test_claim_and_execute_once_handles_shutdown_cancelled_error(self): + cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=5) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.QueueRepository") as mock_repo_cls: + + mock_session = AsyncMock() + mock_sm = MagicMock() + mock_sm.return_value.__aenter__.return_value = mock_session + mock_sm.return_value.__aexit__.return_value = AsyncMock(return_value=False) + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = mock_sm + + mock_repo = Mock() + mock_repo.claim_one = AsyncMock(return_value={ + "job_id": "test-job-id", + "lease_ttl_sec": 60, + "task": "test.task", + "args": {} + }) + mock_repo.finish_fail_or_retry = AsyncMock() + mock_repo_cls.return_value = mock_repo + + worker = PGWorker(cfg, stop_event) + + async def raise_cancel(*_args, **_kwargs): + raise asyncio.CancelledError() + + with patch.object(worker, "_execute_with_heartbeat", new=raise_cancel): + await worker._claim_and_execute_once() + + mock_repo.finish_fail_or_retry.assert_called_once() + args, kwargs = mock_repo.finish_fail_or_retry.call_args + assert args[0] == "test-job-id" + assert "cancelled by shutdown" in args[1] + assert kwargs.get("is_canceled") is True + + + + + + + @pytest.mark.asyncio + async def test_execute_with_heartbeat_raises_cancelled_when_stop_set(self): + cfg = WorkerConfig(queue="test", heartbeat_sec=1000, claim_backoff_sec=5) + stop_event = asyncio.Event() + stop_event.set() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.QueueRepository") as mock_repo_cls: + + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = Mock() + mock_repo_cls.return_value = Mock() + + worker = PGWorker(cfg, stop_event) + + async def one_yield(): + yield + + with pytest.raises(asyncio.CancelledError): + await worker._execute_with_heartbeat("job-id", 60, one_yield()) + diff --git a/tests/unit/test_workers_reaper.py b/tests/unit/test_workers_reaper.py new file mode 100644 index 0000000..43669db --- /dev/null +++ b/tests/unit/test_workers_reaper.py @@ -0,0 +1,27 @@ +# tests/unit/test_workers_reaper.py +from __future__ import annotations + +import pytest +from unittest.mock import AsyncMock, patch, Mock + +from dataloader.workers.reaper import requeue_lost + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_requeue_lost_calls_repository_and_returns_ids(): + """ + Проверяет, что requeue_lost вызывает QueueRepository.requeue_lost и возвращает результат. + """ + fake_session = Mock() + + with patch("dataloader.workers.reaper.QueueRepository") as repo_cls: + repo = Mock() + repo.requeue_lost = AsyncMock(return_value=["id1", "id2"]) + repo_cls.return_value = repo + + res = await requeue_lost(fake_session) + + assert res == ["id1", "id2"] + repo_cls.assert_called_once_with(fake_session) + repo.requeue_lost.assert_awaited_once()