dataloader/tests/unit/test_workers_manager.py

295 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, Mock, patch
import pytest
from dataloader.workers.manager import WorkerManager, WorkerSpec, build_manager_from_env
@pytest.mark.unit
class TestWorkerManager:
"""
Unit тесты для WorkerManager.
"""
def test_init_creates_manager_with_specs(self):
"""
Тест создания менеджера со спецификациями воркеров.
"""
specs = [WorkerSpec(queue="test_queue", concurrency=2)]
with patch("dataloader.workers.manager.APP_CTX") as mock_ctx:
mock_ctx.get_logger.return_value = Mock()
manager = WorkerManager(specs)
assert manager._specs == specs
assert manager._stop.is_set() is False
assert manager._tasks == []
assert manager._reaper_task is None
@pytest.mark.asyncio
async def test_start_creates_worker_tasks(self):
"""
Тест старта воркеров и создания задач.
"""
specs = [
WorkerSpec(queue="queue1", concurrency=2),
WorkerSpec(queue="queue2", concurrency=1),
]
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
patch("dataloader.workers.manager.PGWorker") as mock_worker_cls,
):
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.heartbeat_sec = 10
mock_cfg.worker.claim_backoff_sec = 5
mock_worker_instance = Mock()
mock_worker_instance.run = AsyncMock()
mock_worker_cls.return_value = mock_worker_instance
manager = WorkerManager(specs)
await manager.start()
assert len(manager._tasks) == 3
assert manager._reaper_task is not None
assert mock_worker_cls.call_count == 3
await manager.stop()
@pytest.mark.asyncio
async def test_start_with_zero_concurrency_creates_one_worker(self):
"""
Тест, что concurrency=0 создаёт минимум 1 воркер.
"""
specs = [WorkerSpec(queue="test", concurrency=0)]
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
patch("dataloader.workers.manager.PGWorker") as mock_worker_cls,
):
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.heartbeat_sec = 10
mock_cfg.worker.claim_backoff_sec = 5
mock_worker_instance = Mock()
mock_worker_instance.run = AsyncMock()
mock_worker_cls.return_value = mock_worker_instance
manager = WorkerManager(specs)
await manager.start()
assert len(manager._tasks) == 1
await manager.stop()
@pytest.mark.asyncio
async def test_stop_cancels_all_tasks(self):
"""
Тест остановки всех задач воркеров.
"""
specs = [WorkerSpec(queue="test", concurrency=2)]
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
patch("dataloader.workers.manager.PGWorker") as mock_worker_cls,
):
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.heartbeat_sec = 10
mock_cfg.worker.claim_backoff_sec = 5
mock_worker_instance = Mock()
mock_worker_instance.run = AsyncMock()
mock_worker_cls.return_value = mock_worker_instance
manager = WorkerManager(specs)
await manager.start()
await manager.stop()
assert manager._stop.is_set()
assert len(manager._tasks) == 0
assert manager._reaper_task is None
@pytest.mark.asyncio
async def test_reaper_loop_calls_requeue_lost(self):
"""
Тест, что реапер вызывает requeue_lost.
"""
specs = [WorkerSpec(queue="test", concurrency=1)]
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
patch("dataloader.workers.manager.PGWorker") as mock_worker_cls,
patch("dataloader.workers.manager.requeue_lost") as mock_requeue,
):
mock_logger = Mock()
mock_ctx.get_logger.return_value = mock_logger
mock_cfg.worker.heartbeat_sec = 10
mock_cfg.worker.claim_backoff_sec = 5
mock_cfg.worker.reaper_period_sec = 1
mock_session = AsyncMock()
mock_sm = MagicMock()
mock_sm.return_value.__aenter__.return_value = mock_session
mock_ctx.sessionmaker = mock_sm
mock_worker_instance = Mock()
mock_worker_instance.run = AsyncMock()
mock_worker_cls.return_value = mock_worker_instance
mock_requeue.return_value = ["job1", "job2"]
manager = WorkerManager(specs)
await manager.start()
await asyncio.sleep(1.5)
await manager.stop()
assert mock_requeue.call_count >= 1
@pytest.mark.asyncio
async def test_reaper_loop_handles_exceptions(self):
"""
Тест, что реапер обрабатывает исключения и продолжает работу.
"""
specs = [WorkerSpec(queue="test", concurrency=1)]
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
patch("dataloader.workers.manager.PGWorker") as mock_worker_cls,
patch("dataloader.workers.manager.requeue_lost") as mock_requeue,
):
mock_logger = Mock()
mock_ctx.get_logger.return_value = mock_logger
mock_cfg.worker.heartbeat_sec = 10
mock_cfg.worker.claim_backoff_sec = 5
mock_cfg.worker.reaper_period_sec = 0.5
mock_session = AsyncMock()
mock_sm = MagicMock()
mock_sm.return_value.__aenter__.return_value = mock_session
mock_ctx.sessionmaker = mock_sm
mock_worker_instance = Mock()
mock_worker_instance.run = AsyncMock()
mock_worker_cls.return_value = mock_worker_instance
mock_requeue.side_effect = [Exception("DB error"), []]
manager = WorkerManager(specs)
await manager.start()
await asyncio.sleep(1.2)
await manager.stop()
assert mock_logger.exception.call_count >= 1
@pytest.mark.unit
class TestBuildManagerFromEnv:
"""
Unit тесты для build_manager_from_env.
"""
def test_builds_manager_from_config(self):
"""
Тест создания менеджера из конфигурации.
"""
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
):
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.parsed_workers.return_value = [
{"queue": "queue1", "concurrency": 2},
{"queue": "queue2", "concurrency": 3},
]
manager = build_manager_from_env()
assert len(manager._specs) == 2
assert manager._specs[0].queue == "queue1"
assert manager._specs[0].concurrency == 2
assert manager._specs[1].queue == "queue2"
assert manager._specs[1].concurrency == 3
def test_skips_empty_queue_names(self):
"""
Тест, что пустые имена очередей пропускаются.
"""
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
):
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.parsed_workers.return_value = [
{"queue": "", "concurrency": 2},
{"queue": "valid_queue", "concurrency": 1},
{"queue": " ", "concurrency": 3},
]
manager = build_manager_from_env()
assert len(manager._specs) == 1
assert manager._specs[0].queue == "valid_queue"
def test_handles_missing_fields_with_defaults(self):
"""
Тест обработки отсутствующих полей с дефолтными значениями.
"""
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
):
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.parsed_workers.return_value = [
{"queue": "test"},
{"queue": "test2", "concurrency": 0},
]
manager = build_manager_from_env()
assert len(manager._specs) == 2
assert manager._specs[0].concurrency == 1
assert manager._specs[1].concurrency == 1
def test_ensures_minimum_concurrency_of_one(self):
"""
Тест, что concurrency всегда минимум 1.
"""
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
):
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.parsed_workers.return_value = [
{"queue": "test1", "concurrency": 0},
{"queue": "test2", "concurrency": -5},
]
manager = build_manager_from_env()
assert len(manager._specs) == 2
assert manager._specs[0].concurrency == 1
assert manager._specs[1].concurrency == 1