dataloader/tests/unit/test_api_service.py

367 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from datetime import datetime, timezone
from unittest.mock import AsyncMock, Mock, patch
from uuid import UUID
import pytest
from dataloader.api.v1.schemas import TriggerJobRequest
from dataloader.api.v1.service import JobsService
from dataloader.storage.schemas import JobStatus
@pytest.mark.unit
class TestJobsService:
"""
Unit тесты для JobsService.
"""
def test_init_creates_service_with_session(self):
"""
Тест создания сервиса с сессией.
"""
mock_session = AsyncMock()
with patch("dataloader.api.v1.service.get_logger") as mock_get_logger:
mock_get_logger.return_value = Mock()
service = JobsService(mock_session)
assert service._s == mock_session
assert service._repo is not None
@pytest.mark.asyncio
async def test_trigger_creates_new_job(self):
"""
Тест создания новой задачи через trigger.
"""
mock_session = AsyncMock()
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id,
):
mock_get_logger.return_value = Mock()
mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678")
mock_repo = Mock()
mock_repo.create_or_get = AsyncMock(
return_value=("12345678-1234-5678-1234-567812345678", "queued")
)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
req = TriggerJobRequest(
queue="test_queue",
task="test.task",
args={"key": "value"},
lock_key="lock_1",
priority=100,
max_attempts=5,
lease_ttl_sec=60,
)
response = await service.trigger(req)
assert response.job_id == UUID("12345678-1234-5678-1234-567812345678")
assert response.status == "queued"
mock_repo.create_or_get.assert_called_once()
@pytest.mark.asyncio
async def test_trigger_with_idempotency_key(self):
"""
Тест создания задачи с idempotency_key.
"""
mock_session = AsyncMock()
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id,
):
mock_get_logger.return_value = Mock()
mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678")
mock_repo = Mock()
mock_repo.create_or_get = AsyncMock(
return_value=("12345678-1234-5678-1234-567812345678", "queued")
)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
req = TriggerJobRequest(
queue="test_queue",
task="test.task",
args={},
idempotency_key="unique_key_123",
lock_key="lock_1",
priority=100,
max_attempts=5,
lease_ttl_sec=60,
)
response = await service.trigger(req)
assert response.status == "queued"
call_args = mock_repo.create_or_get.call_args[0][0]
assert call_args.idempotency_key == "unique_key_123"
@pytest.mark.asyncio
async def test_trigger_with_available_at(self):
"""
Тест создания задачи с отложенным запуском.
"""
mock_session = AsyncMock()
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id,
):
mock_get_logger.return_value = Mock()
mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678")
mock_repo = Mock()
mock_repo.create_or_get = AsyncMock(
return_value=("12345678-1234-5678-1234-567812345678", "queued")
)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
future_time = datetime(2025, 12, 31, 23, 59, 59, tzinfo=timezone.utc)
req = TriggerJobRequest(
queue="test_queue",
task="test.task",
args={},
lock_key="lock_1",
available_at=future_time,
priority=100,
max_attempts=5,
lease_ttl_sec=60,
)
await service.trigger(req)
call_args = mock_repo.create_or_get.call_args[0][0]
assert call_args.available_at == future_time
@pytest.mark.asyncio
async def test_trigger_with_optional_fields(self):
"""
Тест создания задачи с опциональными полями.
"""
mock_session = AsyncMock()
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id,
):
mock_get_logger.return_value = Mock()
mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678")
mock_repo = Mock()
mock_repo.create_or_get = AsyncMock(
return_value=("12345678-1234-5678-1234-567812345678", "queued")
)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
req = TriggerJobRequest(
queue="test_queue",
task="test.task",
args={},
lock_key="lock_1",
partition_key="partition_1",
producer="test_producer",
consumer_group="test_group",
priority=100,
max_attempts=5,
lease_ttl_sec=60,
)
await service.trigger(req)
call_args = mock_repo.create_or_get.call_args[0][0]
assert call_args.partition_key == "partition_1"
assert call_args.producer == "test_producer"
assert call_args.consumer_group == "test_group"
@pytest.mark.asyncio
async def test_status_returns_job_status(self):
"""
Тест получения статуса существующей задачи.
"""
mock_session = AsyncMock()
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
):
mock_get_logger.return_value = Mock()
mock_repo = Mock()
mock_status = JobStatus(
job_id="12345678-1234-5678-1234-567812345678",
status="running",
attempt=1,
started_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
finished_at=None,
heartbeat_at=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc),
error=None,
progress={"step": 1},
)
mock_repo.get_status = AsyncMock(return_value=mock_status)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
job_id = UUID("12345678-1234-5678-1234-567812345678")
response = await service.status(job_id)
assert response is not None
assert response.job_id == job_id
assert response.status == "running"
assert response.attempt == 1
assert response.progress == {"step": 1}
@pytest.mark.asyncio
async def test_status_returns_none_for_nonexistent_job(self):
"""
Тест получения статуса несуществующей задачи.
"""
mock_session = AsyncMock()
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
):
mock_get_logger.return_value = Mock()
mock_repo = Mock()
mock_repo.get_status = AsyncMock(return_value=None)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
job_id = UUID("00000000-0000-0000-0000-000000000000")
response = await service.status(job_id)
assert response is None
@pytest.mark.asyncio
async def test_cancel_cancels_job_and_returns_status(self):
"""
Тест отмены задачи и получения её статуса.
"""
mock_session = AsyncMock()
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
):
mock_get_logger.return_value = Mock()
mock_repo = Mock()
mock_repo.cancel = AsyncMock()
mock_status = JobStatus(
job_id="12345678-1234-5678-1234-567812345678",
status="running",
attempt=1,
started_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
finished_at=None,
heartbeat_at=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc),
error=None,
progress={},
)
mock_repo.get_status = AsyncMock(return_value=mock_status)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
job_id = UUID("12345678-1234-5678-1234-567812345678")
response = await service.cancel(job_id)
assert response is not None
assert response.job_id == job_id
assert response.status == "running"
mock_repo.cancel.assert_called_once_with(str(job_id))
@pytest.mark.asyncio
async def test_cancel_returns_none_for_nonexistent_job(self):
"""
Тест отмены несуществующей задачи.
"""
mock_session = AsyncMock()
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
):
mock_get_logger.return_value = Mock()
mock_repo = Mock()
mock_repo.cancel = AsyncMock()
mock_repo.get_status = AsyncMock(return_value=None)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
job_id = UUID("00000000-0000-0000-0000-000000000000")
response = await service.cancel(job_id)
assert response is None
mock_repo.cancel.assert_called_once()
@pytest.mark.asyncio
async def test_status_handles_empty_progress(self):
"""
Тест обработки None progress.
"""
mock_session = AsyncMock()
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
):
mock_get_logger.return_value = Mock()
mock_repo = Mock()
mock_status = JobStatus(
job_id="12345678-1234-5678-1234-567812345678",
status="queued",
attempt=0,
started_at=None,
finished_at=None,
heartbeat_at=None,
error=None,
progress=None,
)
mock_repo.get_status = AsyncMock(return_value=mock_status)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
job_id = UUID("12345678-1234-5678-1234-567812345678")
response = await service.status(job_id)
assert response is not None
assert response.progress == {}