from __future__ import annotations import asyncio from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest from dataloader.workers.manager import WorkerManager, WorkerSpec, build_manager_from_env @pytest.mark.unit class TestWorkerManager: """ Unit тесты для WorkerManager. """ def test_init_creates_manager_with_specs(self): """ Тест создания менеджера со спецификациями воркеров. """ specs = [WorkerSpec(queue="test_queue", concurrency=2)] with patch("dataloader.workers.manager.APP_CTX") as mock_ctx: mock_ctx.get_logger.return_value = Mock() manager = WorkerManager(specs) assert manager._specs == specs assert manager._stop.is_set() is False assert manager._tasks == [] assert manager._reaper_task is None @pytest.mark.asyncio async def test_start_creates_worker_tasks(self): """ Тест старта воркеров и создания задач. """ specs = [ WorkerSpec(queue="queue1", concurrency=2), WorkerSpec(queue="queue2", concurrency=1), ] with ( patch("dataloader.workers.manager.APP_CTX") as mock_ctx, patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, patch("dataloader.workers.manager.PGWorker") as mock_worker_cls, ): mock_ctx.get_logger.return_value = Mock() mock_cfg.worker.heartbeat_sec = 10 mock_cfg.worker.claim_backoff_sec = 5 mock_worker_instance = Mock() mock_worker_instance.run = AsyncMock() mock_worker_cls.return_value = mock_worker_instance manager = WorkerManager(specs) await manager.start() assert len(manager._tasks) == 3 assert manager._reaper_task is not None assert mock_worker_cls.call_count == 3 await manager.stop() @pytest.mark.asyncio async def test_start_with_zero_concurrency_creates_one_worker(self): """ Тест, что concurrency=0 создаёт минимум 1 воркер. """ specs = [WorkerSpec(queue="test", concurrency=0)] with ( patch("dataloader.workers.manager.APP_CTX") as mock_ctx, patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, patch("dataloader.workers.manager.PGWorker") as mock_worker_cls, ): mock_ctx.get_logger.return_value = Mock() mock_cfg.worker.heartbeat_sec = 10 mock_cfg.worker.claim_backoff_sec = 5 mock_worker_instance = Mock() mock_worker_instance.run = AsyncMock() mock_worker_cls.return_value = mock_worker_instance manager = WorkerManager(specs) await manager.start() assert len(manager._tasks) == 1 await manager.stop() @pytest.mark.asyncio async def test_stop_cancels_all_tasks(self): """ Тест остановки всех задач воркеров. """ specs = [WorkerSpec(queue="test", concurrency=2)] with ( patch("dataloader.workers.manager.APP_CTX") as mock_ctx, patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, patch("dataloader.workers.manager.PGWorker") as mock_worker_cls, ): mock_ctx.get_logger.return_value = Mock() mock_cfg.worker.heartbeat_sec = 10 mock_cfg.worker.claim_backoff_sec = 5 mock_worker_instance = Mock() mock_worker_instance.run = AsyncMock() mock_worker_cls.return_value = mock_worker_instance manager = WorkerManager(specs) await manager.start() await manager.stop() assert manager._stop.is_set() assert len(manager._tasks) == 0 assert manager._reaper_task is None @pytest.mark.asyncio async def test_reaper_loop_calls_requeue_lost(self): """ Тест, что реапер вызывает requeue_lost. """ specs = [WorkerSpec(queue="test", concurrency=1)] with ( patch("dataloader.workers.manager.APP_CTX") as mock_ctx, patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, patch("dataloader.workers.manager.PGWorker") as mock_worker_cls, patch("dataloader.workers.manager.requeue_lost") as mock_requeue, ): mock_logger = Mock() mock_ctx.get_logger.return_value = mock_logger mock_cfg.worker.heartbeat_sec = 10 mock_cfg.worker.claim_backoff_sec = 5 mock_cfg.worker.reaper_period_sec = 1 mock_session = AsyncMock() mock_sm = MagicMock() mock_sm.return_value.__aenter__.return_value = mock_session mock_ctx.sessionmaker = mock_sm mock_worker_instance = Mock() mock_worker_instance.run = AsyncMock() mock_worker_cls.return_value = mock_worker_instance mock_requeue.return_value = ["job1", "job2"] manager = WorkerManager(specs) await manager.start() await asyncio.sleep(1.5) await manager.stop() assert mock_requeue.call_count >= 1 @pytest.mark.asyncio async def test_reaper_loop_handles_exceptions(self): """ Тест, что реапер обрабатывает исключения и продолжает работу. """ specs = [WorkerSpec(queue="test", concurrency=1)] with ( patch("dataloader.workers.manager.APP_CTX") as mock_ctx, patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, patch("dataloader.workers.manager.PGWorker") as mock_worker_cls, patch("dataloader.workers.manager.requeue_lost") as mock_requeue, ): mock_logger = Mock() mock_ctx.get_logger.return_value = mock_logger mock_cfg.worker.heartbeat_sec = 10 mock_cfg.worker.claim_backoff_sec = 5 mock_cfg.worker.reaper_period_sec = 0.5 mock_session = AsyncMock() mock_sm = MagicMock() mock_sm.return_value.__aenter__.return_value = mock_session mock_ctx.sessionmaker = mock_sm mock_worker_instance = Mock() mock_worker_instance.run = AsyncMock() mock_worker_cls.return_value = mock_worker_instance mock_requeue.side_effect = [Exception("DB error"), []] manager = WorkerManager(specs) await manager.start() await asyncio.sleep(1.2) await manager.stop() assert mock_logger.exception.call_count >= 1 @pytest.mark.unit class TestBuildManagerFromEnv: """ Unit тесты для build_manager_from_env. """ def test_builds_manager_from_config(self): """ Тест создания менеджера из конфигурации. """ with ( patch("dataloader.workers.manager.APP_CTX") as mock_ctx, patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, ): mock_ctx.get_logger.return_value = Mock() mock_cfg.worker.parsed_workers.return_value = [ {"queue": "queue1", "concurrency": 2}, {"queue": "queue2", "concurrency": 3}, ] manager = build_manager_from_env() assert len(manager._specs) == 2 assert manager._specs[0].queue == "queue1" assert manager._specs[0].concurrency == 2 assert manager._specs[1].queue == "queue2" assert manager._specs[1].concurrency == 3 def test_skips_empty_queue_names(self): """ Тест, что пустые имена очередей пропускаются. """ with ( patch("dataloader.workers.manager.APP_CTX") as mock_ctx, patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, ): mock_ctx.get_logger.return_value = Mock() mock_cfg.worker.parsed_workers.return_value = [ {"queue": "", "concurrency": 2}, {"queue": "valid_queue", "concurrency": 1}, {"queue": " ", "concurrency": 3}, ] manager = build_manager_from_env() assert len(manager._specs) == 1 assert manager._specs[0].queue == "valid_queue" def test_handles_missing_fields_with_defaults(self): """ Тест обработки отсутствующих полей с дефолтными значениями. """ with ( patch("dataloader.workers.manager.APP_CTX") as mock_ctx, patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, ): mock_ctx.get_logger.return_value = Mock() mock_cfg.worker.parsed_workers.return_value = [ {"queue": "test"}, {"queue": "test2", "concurrency": 0}, ] manager = build_manager_from_env() assert len(manager._specs) == 2 assert manager._specs[0].concurrency == 1 assert manager._specs[1].concurrency == 1 def test_ensures_minimum_concurrency_of_one(self): """ Тест, что concurrency всегда минимум 1. """ with ( patch("dataloader.workers.manager.APP_CTX") as mock_ctx, patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, ): mock_ctx.get_logger.return_value = Mock() mock_cfg.worker.parsed_workers.return_value = [ {"queue": "test1", "concurrency": 0}, {"queue": "test2", "concurrency": -5}, ] manager = build_manager_from_env() assert len(manager._specs) == 2 assert manager._specs[0].concurrency == 1 assert manager._specs[1].concurrency == 1