from __future__ import annotations from datetime import datetime, timezone from unittest.mock import AsyncMock, Mock, patch from uuid import UUID import pytest from dataloader.api.v1.schemas import TriggerJobRequest from dataloader.api.v1.service import JobsService from dataloader.storage.schemas import JobStatus @pytest.mark.unit class TestJobsService: """ Unit тесты для JobsService. """ def test_init_creates_service_with_session(self): """ Тест создания сервиса с сессией. """ mock_session = AsyncMock() with patch("dataloader.api.v1.service.get_logger") as mock_get_logger: mock_get_logger.return_value = Mock() service = JobsService(mock_session) assert service._s == mock_session assert service._repo is not None @pytest.mark.asyncio async def test_trigger_creates_new_job(self): """ Тест создания новой задачи через trigger. """ mock_session = AsyncMock() with ( patch("dataloader.api.v1.service.get_logger") as mock_get_logger, patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id, ): mock_get_logger.return_value = Mock() mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678") mock_repo = Mock() mock_repo.create_or_get = AsyncMock( return_value=("12345678-1234-5678-1234-567812345678", "queued") ) mock_repo_cls.return_value = mock_repo service = JobsService(mock_session) req = TriggerJobRequest( queue="test_queue", task="test.task", args={"key": "value"}, lock_key="lock_1", priority=100, max_attempts=5, lease_ttl_sec=60, ) response = await service.trigger(req) assert response.job_id == UUID("12345678-1234-5678-1234-567812345678") assert response.status == "queued" mock_repo.create_or_get.assert_called_once() @pytest.mark.asyncio async def test_trigger_with_idempotency_key(self): """ Тест создания задачи с idempotency_key. """ mock_session = AsyncMock() with ( patch("dataloader.api.v1.service.get_logger") as mock_get_logger, patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id, ): mock_get_logger.return_value = Mock() mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678") mock_repo = Mock() mock_repo.create_or_get = AsyncMock( return_value=("12345678-1234-5678-1234-567812345678", "queued") ) mock_repo_cls.return_value = mock_repo service = JobsService(mock_session) req = TriggerJobRequest( queue="test_queue", task="test.task", args={}, idempotency_key="unique_key_123", lock_key="lock_1", priority=100, max_attempts=5, lease_ttl_sec=60, ) response = await service.trigger(req) assert response.status == "queued" call_args = mock_repo.create_or_get.call_args[0][0] assert call_args.idempotency_key == "unique_key_123" @pytest.mark.asyncio async def test_trigger_with_available_at(self): """ Тест создания задачи с отложенным запуском. """ mock_session = AsyncMock() with ( patch("dataloader.api.v1.service.get_logger") as mock_get_logger, patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id, ): mock_get_logger.return_value = Mock() mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678") mock_repo = Mock() mock_repo.create_or_get = AsyncMock( return_value=("12345678-1234-5678-1234-567812345678", "queued") ) mock_repo_cls.return_value = mock_repo service = JobsService(mock_session) future_time = datetime(2025, 12, 31, 23, 59, 59, tzinfo=timezone.utc) req = TriggerJobRequest( queue="test_queue", task="test.task", args={}, lock_key="lock_1", available_at=future_time, priority=100, max_attempts=5, lease_ttl_sec=60, ) await service.trigger(req) call_args = mock_repo.create_or_get.call_args[0][0] assert call_args.available_at == future_time @pytest.mark.asyncio async def test_trigger_with_optional_fields(self): """ Тест создания задачи с опциональными полями. """ mock_session = AsyncMock() with ( patch("dataloader.api.v1.service.get_logger") as mock_get_logger, patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id, ): mock_get_logger.return_value = Mock() mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678") mock_repo = Mock() mock_repo.create_or_get = AsyncMock( return_value=("12345678-1234-5678-1234-567812345678", "queued") ) mock_repo_cls.return_value = mock_repo service = JobsService(mock_session) req = TriggerJobRequest( queue="test_queue", task="test.task", args={}, lock_key="lock_1", partition_key="partition_1", producer="test_producer", consumer_group="test_group", priority=100, max_attempts=5, lease_ttl_sec=60, ) await service.trigger(req) call_args = mock_repo.create_or_get.call_args[0][0] assert call_args.partition_key == "partition_1" assert call_args.producer == "test_producer" assert call_args.consumer_group == "test_group" @pytest.mark.asyncio async def test_status_returns_job_status(self): """ Тест получения статуса существующей задачи. """ mock_session = AsyncMock() with ( patch("dataloader.api.v1.service.get_logger") as mock_get_logger, patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, ): mock_get_logger.return_value = Mock() mock_repo = Mock() mock_status = JobStatus( job_id="12345678-1234-5678-1234-567812345678", status="running", attempt=1, started_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), finished_at=None, heartbeat_at=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), error=None, progress={"step": 1}, ) mock_repo.get_status = AsyncMock(return_value=mock_status) mock_repo_cls.return_value = mock_repo service = JobsService(mock_session) job_id = UUID("12345678-1234-5678-1234-567812345678") response = await service.status(job_id) assert response is not None assert response.job_id == job_id assert response.status == "running" assert response.attempt == 1 assert response.progress == {"step": 1} @pytest.mark.asyncio async def test_status_returns_none_for_nonexistent_job(self): """ Тест получения статуса несуществующей задачи. """ mock_session = AsyncMock() with ( patch("dataloader.api.v1.service.get_logger") as mock_get_logger, patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, ): mock_get_logger.return_value = Mock() mock_repo = Mock() mock_repo.get_status = AsyncMock(return_value=None) mock_repo_cls.return_value = mock_repo service = JobsService(mock_session) job_id = UUID("00000000-0000-0000-0000-000000000000") response = await service.status(job_id) assert response is None @pytest.mark.asyncio async def test_cancel_cancels_job_and_returns_status(self): """ Тест отмены задачи и получения её статуса. """ mock_session = AsyncMock() with ( patch("dataloader.api.v1.service.get_logger") as mock_get_logger, patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, ): mock_get_logger.return_value = Mock() mock_repo = Mock() mock_repo.cancel = AsyncMock() mock_status = JobStatus( job_id="12345678-1234-5678-1234-567812345678", status="running", attempt=1, started_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), finished_at=None, heartbeat_at=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), error=None, progress={}, ) mock_repo.get_status = AsyncMock(return_value=mock_status) mock_repo_cls.return_value = mock_repo service = JobsService(mock_session) job_id = UUID("12345678-1234-5678-1234-567812345678") response = await service.cancel(job_id) assert response is not None assert response.job_id == job_id assert response.status == "running" mock_repo.cancel.assert_called_once_with(str(job_id)) @pytest.mark.asyncio async def test_cancel_returns_none_for_nonexistent_job(self): """ Тест отмены несуществующей задачи. """ mock_session = AsyncMock() with ( patch("dataloader.api.v1.service.get_logger") as mock_get_logger, patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, ): mock_get_logger.return_value = Mock() mock_repo = Mock() mock_repo.cancel = AsyncMock() mock_repo.get_status = AsyncMock(return_value=None) mock_repo_cls.return_value = mock_repo service = JobsService(mock_session) job_id = UUID("00000000-0000-0000-0000-000000000000") response = await service.cancel(job_id) assert response is None mock_repo.cancel.assert_called_once() @pytest.mark.asyncio async def test_status_handles_empty_progress(self): """ Тест обработки None progress. """ mock_session = AsyncMock() with ( patch("dataloader.api.v1.service.get_logger") as mock_get_logger, patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, ): mock_get_logger.return_value = Mock() mock_repo = Mock() mock_status = JobStatus( job_id="12345678-1234-5678-1234-567812345678", status="queued", attempt=0, started_at=None, finished_at=None, heartbeat_at=None, error=None, progress=None, ) mock_repo.get_status = AsyncMock(return_value=mock_status) mock_repo_cls.return_value = mock_repo service = JobsService(mock_session) job_id = UUID("12345678-1234-5678-1234-567812345678") response = await service.status(job_id) assert response is not None assert response.progress == {}