diff --git a/.coveragerc b/.coveragerc index 16ec8bb..5d87505 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,3 +1,30 @@ [run] +source = src/dataloader omit = + # Логирование - шаблонный код, не требует тестирования src/dataloader/logger/* + # Точка входа - не требует тестирования + src/dataloader/__main__.py + # Базовые классы без логики + src/dataloader/base.py + # Middleware - сложно тестировать, покрыт интеграционными тестами + src/dataloader/api/middleware.py + # Тестовые файлы + */tests/* + +[report] +exclude_lines = + pragma: no cover + def __repr__ + raise AssertionError + raise NotImplementedError + if __name__ == .__main__.: + if TYPE_CHECKING: + @abstractmethod + \.\.\. + +precision = 2 +show_missing = True + +[html] +directory = htmlcov diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..f28a885 --- /dev/null +++ b/tests/unit/__init__.py @@ -0,0 +1 @@ +# tests/unit/__init__.py diff --git a/tests/unit/test_api_service.py b/tests/unit/test_api_service.py new file mode 100644 index 0000000..31ae26a --- /dev/null +++ b/tests/unit/test_api_service.py @@ -0,0 +1,340 @@ +# tests/unit/test_api_service.py +from __future__ import annotations + +from datetime import datetime, timezone +from uuid import UUID +from unittest.mock import AsyncMock, Mock, patch +import pytest + +from dataloader.api.v1.service import JobsService +from dataloader.api.v1.schemas import TriggerJobRequest +from dataloader.storage.schemas import JobStatus + + +@pytest.mark.unit +class TestJobsService: + """ + Unit тесты для JobsService. + """ + + def test_init_creates_service_with_session(self): + """ + Тест создания сервиса с сессией. + """ + mock_session = AsyncMock() + + with patch("dataloader.api.v1.service.get_logger") as mock_get_logger: + mock_get_logger.return_value = Mock() + + service = JobsService(mock_session) + + assert service._s == mock_session + assert service._repo is not None + + @pytest.mark.asyncio + async def test_trigger_creates_new_job(self): + """ + Тест создания новой задачи через trigger. + """ + mock_session = AsyncMock() + + with patch("dataloader.api.v1.service.get_logger") as mock_get_logger, \ + patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, \ + patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id: + + mock_get_logger.return_value = Mock() + mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678") + + mock_repo = Mock() + mock_repo.create_or_get = AsyncMock(return_value=("12345678-1234-5678-1234-567812345678", "queued")) + mock_repo_cls.return_value = mock_repo + + service = JobsService(mock_session) + + req = TriggerJobRequest( + queue="test_queue", + task="test.task", + args={"key": "value"}, + lock_key="lock_1", + priority=100, + max_attempts=5, + lease_ttl_sec=60 + ) + + response = await service.trigger(req) + + assert response.job_id == UUID("12345678-1234-5678-1234-567812345678") + assert response.status == "queued" + mock_repo.create_or_get.assert_called_once() + + @pytest.mark.asyncio + async def test_trigger_with_idempotency_key(self): + """ + Тест создания задачи с idempotency_key. + """ + mock_session = AsyncMock() + + with patch("dataloader.api.v1.service.get_logger") as mock_get_logger, \ + patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, \ + patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id: + + mock_get_logger.return_value = Mock() + mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678") + + mock_repo = Mock() + mock_repo.create_or_get = AsyncMock(return_value=("12345678-1234-5678-1234-567812345678", "queued")) + mock_repo_cls.return_value = mock_repo + + service = JobsService(mock_session) + + req = TriggerJobRequest( + queue="test_queue", + task="test.task", + args={}, + idempotency_key="unique_key_123", + lock_key="lock_1", + priority=100, + max_attempts=5, + lease_ttl_sec=60 + ) + + response = await service.trigger(req) + + assert response.status == "queued" + + call_args = mock_repo.create_or_get.call_args[0][0] + assert call_args.idempotency_key == "unique_key_123" + + @pytest.mark.asyncio + async def test_trigger_with_available_at(self): + """ + Тест создания задачи с отложенным запуском. + """ + mock_session = AsyncMock() + + with patch("dataloader.api.v1.service.get_logger") as mock_get_logger, \ + patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, \ + patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id: + + mock_get_logger.return_value = Mock() + mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678") + + mock_repo = Mock() + mock_repo.create_or_get = AsyncMock(return_value=("12345678-1234-5678-1234-567812345678", "queued")) + mock_repo_cls.return_value = mock_repo + + service = JobsService(mock_session) + + future_time = datetime(2025, 12, 31, 23, 59, 59, tzinfo=timezone.utc) + + req = TriggerJobRequest( + queue="test_queue", + task="test.task", + args={}, + lock_key="lock_1", + available_at=future_time, + priority=100, + max_attempts=5, + lease_ttl_sec=60 + ) + + response = await service.trigger(req) + + call_args = mock_repo.create_or_get.call_args[0][0] + assert call_args.available_at == future_time + + @pytest.mark.asyncio + async def test_trigger_with_optional_fields(self): + """ + Тест создания задачи с опциональными полями. + """ + mock_session = AsyncMock() + + with patch("dataloader.api.v1.service.get_logger") as mock_get_logger, \ + patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls, \ + patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id: + + mock_get_logger.return_value = Mock() + mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678") + + mock_repo = Mock() + mock_repo.create_or_get = AsyncMock(return_value=("12345678-1234-5678-1234-567812345678", "queued")) + mock_repo_cls.return_value = mock_repo + + service = JobsService(mock_session) + + req = TriggerJobRequest( + queue="test_queue", + task="test.task", + args={}, + lock_key="lock_1", + partition_key="partition_1", + producer="test_producer", + consumer_group="test_group", + priority=100, + max_attempts=5, + lease_ttl_sec=60 + ) + + response = await service.trigger(req) + + call_args = mock_repo.create_or_get.call_args[0][0] + assert call_args.partition_key == "partition_1" + assert call_args.producer == "test_producer" + assert call_args.consumer_group == "test_group" + + @pytest.mark.asyncio + async def test_status_returns_job_status(self): + """ + Тест получения статуса существующей задачи. + """ + mock_session = AsyncMock() + + with patch("dataloader.api.v1.service.get_logger") as mock_get_logger, \ + patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls: + + mock_get_logger.return_value = Mock() + + mock_repo = Mock() + mock_status = JobStatus( + job_id="12345678-1234-5678-1234-567812345678", + status="running", + attempt=1, + started_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + finished_at=None, + heartbeat_at=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), + error=None, + progress={"step": 1} + ) + mock_repo.get_status = AsyncMock(return_value=mock_status) + mock_repo_cls.return_value = mock_repo + + service = JobsService(mock_session) + + job_id = UUID("12345678-1234-5678-1234-567812345678") + response = await service.status(job_id) + + assert response is not None + assert response.job_id == job_id + assert response.status == "running" + assert response.attempt == 1 + assert response.progress == {"step": 1} + + @pytest.mark.asyncio + async def test_status_returns_none_for_nonexistent_job(self): + """ + Тест получения статуса несуществующей задачи. + """ + mock_session = AsyncMock() + + with patch("dataloader.api.v1.service.get_logger") as mock_get_logger, \ + patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls: + + mock_get_logger.return_value = Mock() + + mock_repo = Mock() + mock_repo.get_status = AsyncMock(return_value=None) + mock_repo_cls.return_value = mock_repo + + service = JobsService(mock_session) + + job_id = UUID("00000000-0000-0000-0000-000000000000") + response = await service.status(job_id) + + assert response is None + + @pytest.mark.asyncio + async def test_cancel_cancels_job_and_returns_status(self): + """ + Тест отмены задачи и получения её статуса. + """ + mock_session = AsyncMock() + + with patch("dataloader.api.v1.service.get_logger") as mock_get_logger, \ + patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls: + + mock_get_logger.return_value = Mock() + + mock_repo = Mock() + mock_repo.cancel = AsyncMock() + mock_status = JobStatus( + job_id="12345678-1234-5678-1234-567812345678", + status="running", + attempt=1, + started_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + finished_at=None, + heartbeat_at=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), + error=None, + progress={} + ) + mock_repo.get_status = AsyncMock(return_value=mock_status) + mock_repo_cls.return_value = mock_repo + + service = JobsService(mock_session) + + job_id = UUID("12345678-1234-5678-1234-567812345678") + response = await service.cancel(job_id) + + assert response is not None + assert response.job_id == job_id + assert response.status == "running" + mock_repo.cancel.assert_called_once_with(str(job_id)) + + @pytest.mark.asyncio + async def test_cancel_returns_none_for_nonexistent_job(self): + """ + Тест отмены несуществующей задачи. + """ + mock_session = AsyncMock() + + with patch("dataloader.api.v1.service.get_logger") as mock_get_logger, \ + patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls: + + mock_get_logger.return_value = Mock() + + mock_repo = Mock() + mock_repo.cancel = AsyncMock() + mock_repo.get_status = AsyncMock(return_value=None) + mock_repo_cls.return_value = mock_repo + + service = JobsService(mock_session) + + job_id = UUID("00000000-0000-0000-0000-000000000000") + response = await service.cancel(job_id) + + assert response is None + mock_repo.cancel.assert_called_once() + + @pytest.mark.asyncio + async def test_status_handles_empty_progress(self): + """ + Тест обработки None progress. + """ + mock_session = AsyncMock() + + with patch("dataloader.api.v1.service.get_logger") as mock_get_logger, \ + patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls: + + mock_get_logger.return_value = Mock() + + mock_repo = Mock() + mock_status = JobStatus( + job_id="12345678-1234-5678-1234-567812345678", + status="queued", + attempt=0, + started_at=None, + finished_at=None, + heartbeat_at=None, + error=None, + progress=None + ) + mock_repo.get_status = AsyncMock(return_value=mock_status) + mock_repo_cls.return_value = mock_repo + + service = JobsService(mock_session) + + job_id = UUID("12345678-1234-5678-1234-567812345678") + response = await service.status(job_id) + + assert response is not None + assert response.progress == {} diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py new file mode 100644 index 0000000..0c82df8 --- /dev/null +++ b/tests/unit/test_config.py @@ -0,0 +1,388 @@ +# tests/unit/test_config.py +from __future__ import annotations + +import json +from logging import DEBUG, INFO +from unittest.mock import patch +import pytest + +from dataloader.config import ( + BaseAppSettings, + AppSettings, + LogSettings, + PGSettings, + WorkerSettings, + Secrets, +) + + +@pytest.mark.unit +class TestBaseAppSettings: + """ + Unit тесты для BaseAppSettings. + """ + + def test_default_values(self): + """ + Тест дефолтных значений. + """ + settings = BaseAppSettings() + + assert settings.local is False + assert settings.debug is False + + def test_protocol_returns_http_when_not_local(self): + """ + Тест, что protocol возвращает http для не-локального режима. + """ + with patch.dict("os.environ", {"LOCAL": "false"}): + settings = BaseAppSettings() + + assert settings.protocol == "http" + + def test_protocol_returns_https_when_local(self): + """ + Тест, что protocol возвращает https для локального режима. + """ + with patch.dict("os.environ", {"LOCAL": "true"}): + settings = BaseAppSettings() + + assert settings.protocol == "https" + + def test_loads_from_env(self): + """ + Тест загрузки из переменных окружения. + """ + with patch.dict("os.environ", {"LOCAL": "true", "DEBUG": "true"}): + settings = BaseAppSettings() + + assert settings.local is True + assert settings.debug is True + + +@pytest.mark.unit +class TestAppSettings: + """ + Unit тесты для AppSettings. + """ + + def test_default_values(self): + """ + Тест дефолтных значений. + """ + settings = AppSettings() + + assert settings.app_host == "0.0.0.0" + assert settings.app_port == 8081 + assert settings.kube_net_name == "AIGATEWAY" + assert settings.timezone == "Europe/Moscow" + + def test_loads_from_env(self): + """ + Тест загрузки из переменных окружения. + """ + with patch.dict("os.environ", { + "APP_HOST": "127.0.0.1", + "APP_PORT": "9000", + "PROJECT_NAME": "TestProject", + "TIMEZONE": "UTC" + }): + settings = AppSettings() + + assert settings.app_host == "127.0.0.1" + assert settings.app_port == 9000 + assert settings.kube_net_name == "TestProject" + assert settings.timezone == "UTC" + + +@pytest.mark.unit +class TestLogSettings: + """ + Unit тесты для LogSettings. + """ + + def test_default_values(self): + """ + Тест дефолтных значений. + """ + settings = LogSettings() + + assert settings.private_log_file_name == "app.log" + assert settings.log_rotation == "10 MB" + assert settings.private_metric_file_name == "app-metric.log" + assert settings.private_audit_file_name == "events.log" + assert settings.audit_host_ip == "127.0.0.1" + + def test_get_file_abs_path_joins_path_and_file(self): + """ + Тест объединения пути и имени файла. + """ + path = LogSettings.get_file_abs_path("/var/log/", "app.log") + + assert "app.log" in path + assert path.startswith("var") + + def test_get_file_abs_path_handles_trailing_slashes(self): + """ + Тест обработки слэшей в путях. + """ + path = LogSettings.get_file_abs_path("/var/log///", "///app.log") + + assert "app.log" in path + assert path.startswith("var") + assert not path.startswith("/") + + def test_log_file_abs_path_property(self): + """ + Тест свойства log_file_abs_path. + """ + with patch.dict("os.environ", {"LOG_PATH": "/var/log", "LOG_FILE_NAME": "test.log"}): + settings = LogSettings() + + assert "test.log" in settings.log_file_abs_path + assert settings.log_file_abs_path.startswith("var") + + def test_metric_file_abs_path_property(self): + """ + Тест свойства metric_file_abs_path. + """ + with patch.dict("os.environ", {"METRIC_PATH": "/var/metrics", "METRIC_FILE_NAME": "metrics.log"}): + settings = LogSettings() + + assert "metrics.log" in settings.metric_file_abs_path + assert settings.metric_file_abs_path.startswith("var") + + def test_audit_file_abs_path_property(self): + """ + Тест свойства audit_file_abs_path. + """ + with patch.dict("os.environ", {"AUDIT_LOG_PATH": "/var/audit", "AUDIT_LOG_FILE_NAME": "audit.log"}): + settings = LogSettings() + + assert "audit.log" in settings.audit_file_abs_path + assert settings.audit_file_abs_path.startswith("var") + + def test_log_lvl_returns_debug_when_debug_enabled(self): + """ + Тест, что log_lvl возвращает DEBUG в debug-режиме. + """ + with patch.dict("os.environ", {"DEBUG": "true"}): + settings = LogSettings() + + assert settings.log_lvl == DEBUG + + def test_log_lvl_returns_info_when_debug_disabled(self): + """ + Тест, что log_lvl возвращает INFO в обычном режиме. + """ + with patch.dict("os.environ", {"DEBUG": "false"}): + settings = LogSettings() + + assert settings.log_lvl == INFO + + +@pytest.mark.unit +class TestPGSettings: + """ + Unit тесты для PGSettings. + """ + + def test_default_values(self): + """ + Тест дефолтных значений. + """ + with patch.dict("os.environ", {}, clear=True): + settings = PGSettings() + + assert settings.host == "localhost" + assert settings.port == 5432 + assert settings.user == "postgres" + assert settings.password == "" + assert settings.database == "postgres" + assert settings.schema_queue == "public" + assert settings.use_pool is True + assert settings.pool_size == 5 + assert settings.max_overflow == 10 + + def test_url_property_returns_connection_string(self): + """ + Тест формирования строки подключения. + """ + with patch.dict("os.environ", { + "PG_HOST": "db.example.com", + "PG_PORT": "5433", + "PG_USER": "testuser", + "PG_PASSWORD": "testpass", + "PG_DATABASE": "testdb" + }): + settings = PGSettings() + + expected = "postgresql+asyncpg://testuser:testpass@db.example.com:5433/testdb" + assert settings.url == expected + + def test_url_property_with_empty_password(self): + """ + Тест строки подключения с пустым паролем. + """ + with patch.dict("os.environ", { + "PG_HOST": "localhost", + "PG_PORT": "5432", + "PG_USER": "postgres", + "PG_PASSWORD": "", + "PG_DATABASE": "testdb" + }): + settings = PGSettings() + + expected = "postgresql+asyncpg://postgres:@localhost:5432/testdb" + assert settings.url == expected + + def test_loads_from_env(self): + """ + Тест загрузки из переменных окружения. + """ + with patch.dict("os.environ", { + "PG_HOST": "testhost", + "PG_PORT": "5433", + "PG_USER": "testuser", + "PG_PASSWORD": "testpass", + "PG_DATABASE": "testdb", + "PG_SCHEMA_QUEUE": "queue_schema", + "PG_POOL_SIZE": "20" + }): + settings = PGSettings() + + assert settings.host == "testhost" + assert settings.port == 5433 + assert settings.user == "testuser" + assert settings.password == "testpass" + assert settings.database == "testdb" + assert settings.schema_queue == "queue_schema" + assert settings.pool_size == 20 + + +@pytest.mark.unit +class TestWorkerSettings: + """ + Unit тесты для WorkerSettings. + """ + + def test_default_values(self): + """ + Тест дефолтных значений. + """ + with patch.dict("os.environ", {"WORKERS_JSON": "[]"}, clear=True): + settings = WorkerSettings() + + assert settings.workers_json == "[]" + assert settings.heartbeat_sec == 10 + assert settings.default_lease_ttl_sec == 60 + assert settings.reaper_period_sec == 10 + assert settings.claim_backoff_sec == 15 + + def test_parsed_workers_returns_empty_list_for_default(self): + """ + Тест, что parsed_workers возвращает пустой список по умолчанию. + """ + with patch.dict("os.environ", {"WORKERS_JSON": "[]"}): + settings = WorkerSettings() + + assert settings.parsed_workers() == [] + + def test_parsed_workers_parses_valid_json(self): + """ + Тест парсинга валидного JSON. + """ + workers_json = json.dumps([ + {"queue": "queue1", "concurrency": 2}, + {"queue": "queue2", "concurrency": 3} + ]) + with patch.dict("os.environ", {"WORKERS_JSON": workers_json}): + settings = WorkerSettings() + + workers = settings.parsed_workers() + + assert len(workers) == 2 + assert workers[0]["queue"] == "queue1" + assert workers[0]["concurrency"] == 2 + assert workers[1]["queue"] == "queue2" + assert workers[1]["concurrency"] == 3 + + def test_parsed_workers_filters_non_dict_items(self): + """ + Тест фильтрации не-словарей из JSON. + """ + workers_json = json.dumps([ + {"queue": "queue1", "concurrency": 2}, + "invalid_item", + 123, + {"queue": "queue2", "concurrency": 3} + ]) + with patch.dict("os.environ", {"WORKERS_JSON": workers_json}): + settings = WorkerSettings() + + workers = settings.parsed_workers() + + assert len(workers) == 2 + assert all(isinstance(w, dict) for w in workers) + + def test_parsed_workers_handles_invalid_json(self): + """ + Тест обработки невалидного JSON. + """ + with patch.dict("os.environ", {"WORKERS_JSON": "not valid json"}): + settings = WorkerSettings() + + workers = settings.parsed_workers() + + assert workers == [] + + def test_parsed_workers_handles_empty_string(self): + """ + Тест обработки пустой строки. + """ + with patch.dict("os.environ", {"WORKERS_JSON": ""}): + settings = WorkerSettings() + + workers = settings.parsed_workers() + + assert workers == [] + + def test_parsed_workers_handles_null_json(self): + """ + Тест обработки null в JSON. + """ + with patch.dict("os.environ", {"WORKERS_JSON": "null"}): + settings = WorkerSettings() + + workers = settings.parsed_workers() + + assert workers == [] + + +@pytest.mark.unit +class TestSecrets: + """ + Unit тесты для Secrets. + """ + + def test_initializes_all_settings(self): + """ + Тест, что Secrets инициализирует все настройки. + """ + secrets = Secrets() + + assert isinstance(secrets.app, AppSettings) + assert isinstance(secrets.log, LogSettings) + assert isinstance(secrets.pg, PGSettings) + assert isinstance(secrets.worker, WorkerSettings) + + def test_all_settings_have_default_values(self): + """ + Тест, что все настройки имеют дефолтные значения. + """ + secrets = Secrets() + + assert secrets.app.app_host == "0.0.0.0" + assert secrets.log.private_log_file_name == "app.log" + assert secrets.pg.host == "localhost" + assert secrets.worker.heartbeat_sec == 10 diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py new file mode 100644 index 0000000..11aa5bd --- /dev/null +++ b/tests/unit/test_context.py @@ -0,0 +1,200 @@ +# tests/unit/test_context.py +from __future__ import annotations + +from unittest.mock import AsyncMock, Mock, patch +import pytest + +from dataloader.context import AppContext, get_session + + +@pytest.mark.unit +class TestAppContext: + """ + Unit тесты для AppContext. + """ + + def test_init_creates_empty_context(self): + """ + Тест создания пустого контекста. + """ + ctx = AppContext() + + assert ctx._engine is None + assert ctx._sessionmaker is None + assert ctx._context_vars_container is not None + + def test_engine_property_raises_when_not_initialized(self): + """ + Тест, что engine выбрасывает RuntimeError, если не инициализирован. + """ + ctx = AppContext() + + with pytest.raises(RuntimeError, match="Database engine is not initialized"): + _ = ctx.engine + + def test_engine_property_returns_engine_when_initialized(self): + """ + Тест, что engine возвращает движок после инициализации. + """ + ctx = AppContext() + mock_engine = Mock() + ctx._engine = mock_engine + + assert ctx.engine == mock_engine + + def test_sessionmaker_property_raises_when_not_initialized(self): + """ + Тест, что sessionmaker выбрасывает RuntimeError, если не инициализирован. + """ + ctx = AppContext() + + with pytest.raises(RuntimeError, match="Sessionmaker is not initialized"): + _ = ctx.sessionmaker + + def test_sessionmaker_property_returns_sessionmaker_when_initialized(self): + """ + Тест, что sessionmaker возвращает фабрику после инициализации. + """ + ctx = AppContext() + mock_sm = Mock() + ctx._sessionmaker = mock_sm + + assert ctx.sessionmaker == mock_sm + + @pytest.mark.asyncio + async def test_on_startup_initializes_engine_and_sessionmaker(self): + """ + Тест, что on_startup инициализирует engine и sessionmaker. + """ + ctx = AppContext() + + mock_engine = Mock() + mock_sm = Mock() + + with patch("dataloader.logger.logger.setup_logging") as mock_setup_logging, \ + patch("dataloader.storage.engine.create_engine", return_value=mock_engine) as mock_create_engine, \ + patch("dataloader.storage.engine.create_sessionmaker", return_value=mock_sm) as mock_create_sm, \ + patch("dataloader.context.APP_CONFIG") as mock_config: + + mock_config.pg.url = "postgresql://test" + + await ctx.on_startup() + + mock_setup_logging.assert_called_once() + mock_create_engine.assert_called_once_with("postgresql://test") + mock_create_sm.assert_called_once_with(mock_engine) + assert ctx._engine == mock_engine + assert ctx._sessionmaker == mock_sm + + @pytest.mark.asyncio + async def test_on_shutdown_disposes_engine(self): + """ + Тест, что on_shutdown закрывает движок БД. + """ + ctx = AppContext() + + mock_engine = AsyncMock() + ctx._engine = mock_engine + + await ctx.on_shutdown() + + mock_engine.dispose.assert_called_once() + + @pytest.mark.asyncio + async def test_on_shutdown_does_nothing_when_no_engine(self): + """ + Тест, что on_shutdown безопасно работает без движка. + """ + ctx = AppContext() + + await ctx.on_shutdown() + + assert ctx._engine is None + + def test_get_logger_returns_logger(self): + """ + Тест получения логгера. + """ + ctx = AppContext() + + with patch("dataloader.logger.logger.get_logger") as mock_get_logger: + mock_logger = Mock() + mock_get_logger.return_value = mock_logger + + logger = ctx.get_logger("test_module") + + mock_get_logger.assert_called_once_with("test_module") + assert logger == mock_logger + + def test_get_logger_without_name(self): + """ + Тест получения логгера без указания имени. + """ + ctx = AppContext() + + with patch("dataloader.logger.logger.get_logger") as mock_get_logger: + mock_logger = Mock() + mock_get_logger.return_value = mock_logger + + logger = ctx.get_logger() + + mock_get_logger.assert_called_once_with(None) + assert logger == mock_logger + + def test_get_context_vars_container_returns_container(self): + """ + Тест получения контейнера контекстных переменных. + """ + ctx = AppContext() + + container = ctx.get_context_vars_container() + + assert container == ctx._context_vars_container + + +@pytest.mark.unit +class TestGetSession: + """ + Unit тесты для get_session dependency. + """ + + @pytest.mark.asyncio + async def test_get_session_yields_session(self): + """ + Тест, что get_session возвращает сессию. + """ + mock_session = AsyncMock() + + mock_context_manager = AsyncMock() + mock_context_manager.__aenter__.return_value = mock_session + mock_context_manager.__aexit__.return_value = None + + mock_sm = Mock(return_value=mock_context_manager) + + with patch("dataloader.context.APP_CTX") as mock_ctx: + mock_ctx.sessionmaker = mock_sm + + async for session in get_session(): + assert session == mock_session + + @pytest.mark.asyncio + async def test_get_session_closes_session_after_use(self): + """ + Тест, что get_session закрывает сессию после использования. + """ + mock_session = AsyncMock() + mock_exit = AsyncMock(return_value=None) + + mock_context_manager = AsyncMock() + mock_context_manager.__aenter__.return_value = mock_session + mock_context_manager.__aexit__ = mock_exit + + mock_sm = Mock(return_value=mock_context_manager) + + with patch("dataloader.context.APP_CTX") as mock_ctx: + mock_ctx.sessionmaker = mock_sm + + async for session in get_session(): + pass + + assert mock_exit.call_count == 1 diff --git a/tests/unit/test_notify_listener.py b/tests/unit/test_notify_listener.py new file mode 100644 index 0000000..2fdc6d2 --- /dev/null +++ b/tests/unit/test_notify_listener.py @@ -0,0 +1,377 @@ +# tests/unit/test_notify_listener.py +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, Mock, patch +import pytest + +from dataloader.storage.notify_listener import PGNotifyListener + + +@pytest.mark.unit +class TestPGNotifyListener: + """ + Unit тесты для PGNotifyListener. + """ + + def test_init_creates_listener_with_config(self): + """ + Тест создания listener'а с конфигурацией. + """ + callback = Mock() + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + assert listener._dsn == "postgresql://test" + assert listener._queue == "test_queue" + assert listener._callback == callback + assert listener._stop == stop_event + assert listener._conn is None + assert listener._task is None + + @pytest.mark.asyncio + async def test_start_establishes_connection_and_listens(self): + """ + Тест запуска прослушивания NOTIFY. + """ + callback = Mock() + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + mock_conn = AsyncMock() + mock_conn.execute = AsyncMock() + mock_conn.add_listener = AsyncMock() + + with patch("dataloader.storage.notify_listener.asyncpg.connect", return_value=mock_conn): + await listener.start() + + assert listener._conn == mock_conn + assert listener._task is not None + mock_conn.execute.assert_called_once_with("LISTEN dl_jobs") + mock_conn.add_listener.assert_called_once() + + await listener.stop() + + @pytest.mark.asyncio + async def test_start_converts_asyncpg_dsn_format(self): + """ + Тест преобразования DSN из формата SQLAlchemy в asyncpg. + """ + callback = Mock() + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql+asyncpg://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + mock_conn = AsyncMock() + mock_conn.execute = AsyncMock() + mock_conn.add_listener = AsyncMock() + + with patch("dataloader.storage.notify_listener.asyncpg.connect", return_value=mock_conn) as mock_connect: + await listener.start() + + mock_connect.assert_called_once_with("postgresql://test") + + await listener.stop() + + @pytest.mark.asyncio + async def test_on_notify_handler_calls_callback_for_matching_queue(self): + """ + Тест, что callback вызывается для совпадающей очереди. + """ + callback = Mock() + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + mock_conn = AsyncMock() + mock_conn.execute = AsyncMock() + mock_conn.add_listener = AsyncMock() + + with patch("dataloader.storage.notify_listener.asyncpg.connect", return_value=mock_conn): + await listener.start() + + handler = listener._on_notify_handler + handler(mock_conn, 12345, "dl_jobs", "test_queue") + + callback.assert_called_once() + + await listener.stop() + + @pytest.mark.asyncio + async def test_on_notify_handler_ignores_wrong_channel(self): + """ + Тест, что callback не вызывается для другого канала. + """ + callback = Mock() + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + mock_conn = AsyncMock() + mock_conn.execute = AsyncMock() + mock_conn.add_listener = AsyncMock() + + with patch("dataloader.storage.notify_listener.asyncpg.connect", return_value=mock_conn): + await listener.start() + + handler = listener._on_notify_handler + handler(mock_conn, 12345, "other_channel", "test_queue") + + callback.assert_not_called() + + await listener.stop() + + @pytest.mark.asyncio + async def test_on_notify_handler_ignores_wrong_queue(self): + """ + Тест, что callback не вызывается для другой очереди. + """ + callback = Mock() + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + mock_conn = AsyncMock() + mock_conn.execute = AsyncMock() + mock_conn.add_listener = AsyncMock() + + with patch("dataloader.storage.notify_listener.asyncpg.connect", return_value=mock_conn): + await listener.start() + + handler = listener._on_notify_handler + handler(mock_conn, 12345, "dl_jobs", "other_queue") + + callback.assert_not_called() + + await listener.stop() + + @pytest.mark.asyncio + async def test_on_notify_handler_suppresses_callback_exceptions(self): + """ + Тест, что исключения в callback не ломают listener. + """ + callback = Mock(side_effect=Exception("Callback error")) + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + mock_conn = AsyncMock() + mock_conn.execute = AsyncMock() + mock_conn.add_listener = AsyncMock() + + with patch("dataloader.storage.notify_listener.asyncpg.connect", return_value=mock_conn): + await listener.start() + + handler = listener._on_notify_handler + handler(mock_conn, 12345, "dl_jobs", "test_queue") + + callback.assert_called_once() + + await listener.stop() + + @pytest.mark.asyncio + async def test_monitor_connection_waits_for_stop_event(self): + """ + Тест, что _monitor_connection ждёт stop_event. + """ + callback = Mock() + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + mock_conn = AsyncMock() + mock_conn.execute = AsyncMock() + mock_conn.add_listener = AsyncMock() + mock_conn.remove_listener = AsyncMock() + mock_conn.close = AsyncMock() + + with patch("dataloader.storage.notify_listener.asyncpg.connect", return_value=mock_conn): + await listener.start() + + assert listener._task is not None + assert not listener._task.done() + + stop_event.set() + await asyncio.sleep(0.1) + + assert listener._task.done() + + @pytest.mark.asyncio + async def test_stop_cancels_task_and_closes_connection(self): + """ + Тест остановки listener'а с закрытием соединения. + """ + callback = Mock() + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + mock_conn = AsyncMock() + mock_conn.execute = AsyncMock() + mock_conn.add_listener = AsyncMock() + mock_conn.remove_listener = AsyncMock() + mock_conn.close = AsyncMock() + + with patch("dataloader.storage.notify_listener.asyncpg.connect", return_value=mock_conn): + await listener.start() + + await listener.stop() + + mock_conn.remove_listener.assert_called_once() + mock_conn.close.assert_called_once() + assert listener._conn is None + + @pytest.mark.asyncio + async def test_stop_handles_already_stopped_task(self): + """ + Тест stop() для уже остановленной задачи. + """ + callback = Mock() + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + mock_conn = AsyncMock() + mock_conn.execute = AsyncMock() + mock_conn.add_listener = AsyncMock() + mock_conn.remove_listener = AsyncMock() + mock_conn.close = AsyncMock() + + with patch("dataloader.storage.notify_listener.asyncpg.connect", return_value=mock_conn): + await listener.start() + + stop_event.set() + await asyncio.sleep(0.1) + + await listener.stop() + + assert listener._conn is None + + @pytest.mark.asyncio + async def test_stop_handles_remove_listener_exception(self): + """ + Тест, что stop() подавляет исключения при remove_listener. + """ + callback = Mock() + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + mock_conn = AsyncMock() + mock_conn.execute = AsyncMock() + mock_conn.add_listener = AsyncMock() + mock_conn.remove_listener = AsyncMock(side_effect=Exception("Remove error")) + mock_conn.close = AsyncMock() + + with patch("dataloader.storage.notify_listener.asyncpg.connect", return_value=mock_conn): + await listener.start() + + await listener.stop() + + mock_conn.close.assert_called_once() + assert listener._conn is None + + @pytest.mark.asyncio + async def test_stop_handles_close_exception(self): + """ + Тест, что stop() подавляет исключения при close. + """ + callback = Mock() + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + mock_conn = AsyncMock() + mock_conn.execute = AsyncMock() + mock_conn.add_listener = AsyncMock() + mock_conn.remove_listener = AsyncMock() + mock_conn.close = AsyncMock(side_effect=Exception("Close error")) + + with patch("dataloader.storage.notify_listener.asyncpg.connect", return_value=mock_conn): + await listener.start() + + await listener.stop() + + assert listener._conn is None + + @pytest.mark.asyncio + async def test_stop_without_connection_does_nothing(self): + """ + Тест stop() без активного соединения. + """ + callback = Mock() + stop_event = asyncio.Event() + + listener = PGNotifyListener( + dsn="postgresql://test", + queue="test_queue", + callback=callback, + stop_event=stop_event + ) + + await listener.stop() + + assert listener._conn is None + assert listener._task is None diff --git a/tests/unit/test_pipeline_registry.py b/tests/unit/test_pipeline_registry.py new file mode 100644 index 0000000..50c8a67 --- /dev/null +++ b/tests/unit/test_pipeline_registry.py @@ -0,0 +1,82 @@ +# tests/unit/test_pipeline_registry.py +from __future__ import annotations + +import pytest + +from dataloader.workers.pipelines.registry import register, resolve, tasks, _Registry + + +@pytest.mark.unit +class TestPipelineRegistry: + """ + Unit тесты для системы регистрации пайплайнов. + """ + + def setup_method(self): + """ + Очищаем реестр перед каждым тестом. + """ + _Registry.clear() + + def test_register_adds_pipeline_to_registry(self): + """ + Тест регистрации пайплайна. + """ + @register("test.task") + def test_pipeline(args: dict): + return "result" + + assert "test.task" in _Registry + assert _Registry["test.task"] == test_pipeline + + def test_resolve_returns_registered_pipeline(self): + """ + Тест получения зарегистрированного пайплайна. + """ + @register("test.resolve") + def test_pipeline(args: dict): + return "resolved" + + resolved = resolve("test.resolve") + assert resolved == test_pipeline + assert resolved({}) == "resolved" + + def test_resolve_raises_keyerror_for_unknown_task(self): + """ + Тест ошибки при запросе незарегистрированного пайплайна. + """ + with pytest.raises(KeyError) as exc_info: + resolve("unknown.task") + + assert "pipeline not found: unknown.task" in str(exc_info.value) + + def test_tasks_returns_registered_task_names(self): + """ + Тест получения списка зарегистрированных задач. + """ + @register("task1") + def pipeline1(args: dict): + pass + + @register("task2") + def pipeline2(args: dict): + pass + + task_list = list(tasks()) + assert "task1" in task_list + assert "task2" in task_list + + def test_register_overwrites_existing_pipeline(self): + """ + Тест перезаписи существующего пайплайна. + """ + @register("overwrite.task") + def first_pipeline(args: dict): + return "first" + + @register("overwrite.task") + def second_pipeline(args: dict): + return "second" + + resolved = resolve("overwrite.task") + assert resolved({}) == "second" diff --git a/tests/unit/test_workers_base.py b/tests/unit/test_workers_base.py new file mode 100644 index 0000000..6e8adb5 --- /dev/null +++ b/tests/unit/test_workers_base.py @@ -0,0 +1,446 @@ +# tests/unit/test_workers_base.py +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, Mock, patch +import pytest + +from dataloader.workers.base import PGWorker, WorkerConfig + + +@pytest.mark.unit +class TestPGWorker: + """ + Unit тесты для PGWorker. + """ + + def test_init_creates_worker_with_config(self): + """ + Тест создания воркера с конфигурацией. + """ + cfg = WorkerConfig(queue="test_queue", heartbeat_sec=10, claim_backoff_sec=5) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx: + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = Mock() + + worker = PGWorker(cfg, stop_event) + + assert worker._cfg == cfg + assert worker._stop == stop_event + assert worker._listener is None + assert not worker._notify_wakeup.is_set() + + @pytest.mark.asyncio + async def test_run_starts_listener_and_processes_jobs(self): + """ + Тест запуска воркера с listener'ом. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=1) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.APP_CONFIG") as mock_cfg, \ + patch("dataloader.workers.base.PGNotifyListener") as mock_listener_cls: + + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = Mock() + mock_cfg.pg.url = "postgresql+asyncpg://test" + + mock_listener = Mock() + mock_listener.start = AsyncMock() + mock_listener.stop = AsyncMock() + mock_listener_cls.return_value = mock_listener + + worker = PGWorker(cfg, stop_event) + + call_count = [0] + + async def mock_claim(): + call_count[0] += 1 + if call_count[0] >= 2: + stop_event.set() + return False + + with patch.object(worker, "_claim_and_execute_once", side_effect=mock_claim): + await worker.run() + + assert mock_listener.start.call_count == 1 + assert mock_listener.stop.call_count == 1 + + @pytest.mark.asyncio + async def test_run_falls_back_to_polling_if_listener_fails(self): + """ + Тест fallback на polling, если LISTEN/NOTIFY не запустился. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=1) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.APP_CONFIG") as mock_cfg, \ + patch("dataloader.workers.base.PGNotifyListener") as mock_listener_cls: + + mock_logger = Mock() + mock_ctx.get_logger.return_value = mock_logger + mock_ctx.sessionmaker = Mock() + mock_cfg.pg.url = "postgresql+asyncpg://test" + + mock_listener = Mock() + mock_listener.start = AsyncMock(side_effect=Exception("Connection failed")) + mock_listener_cls.return_value = mock_listener + + worker = PGWorker(cfg, stop_event) + + call_count = [0] + + async def mock_claim(): + call_count[0] += 1 + if call_count[0] >= 2: + stop_event.set() + return False + + with patch.object(worker, "_claim_and_execute_once", side_effect=mock_claim): + await worker.run() + + assert worker._listener is None + assert mock_logger.warning.call_count == 1 + + @pytest.mark.asyncio + async def test_listen_or_sleep_with_listener_waits_for_notify(self): + """ + Тест ожидания через LISTEN/NOTIFY. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=5) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx: + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = Mock() + + worker = PGWorker(cfg, stop_event) + worker._listener = Mock() + + worker._notify_wakeup.set() + + await worker._listen_or_sleep(1) + + assert not worker._notify_wakeup.is_set() + + @pytest.mark.asyncio + async def test_listen_or_sleep_without_listener_uses_timeout(self): + """ + Тест fallback на таймаут без listener'а. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=1) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx: + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = Mock() + + worker = PGWorker(cfg, stop_event) + + start_time = asyncio.get_event_loop().time() + await worker._listen_or_sleep(1) + elapsed = asyncio.get_event_loop().time() - start_time + + assert elapsed >= 1.0 + + @pytest.mark.asyncio + async def test_claim_and_execute_once_returns_false_when_no_job(self): + """ + Тест, что claim_and_execute_once возвращает False, если задач нет. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=5) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.QueueRepository") as mock_repo_cls: + + mock_session = AsyncMock() + mock_session.commit = AsyncMock() + mock_sm = MagicMock() + mock_sm.return_value.__aenter__.return_value = mock_session + mock_sm.return_value.__aexit__.return_value = AsyncMock() + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = mock_sm + + mock_repo = Mock() + mock_repo.claim_one = AsyncMock(return_value=None) + mock_repo_cls.return_value = mock_repo + + worker = PGWorker(cfg, stop_event) + result = await worker._claim_and_execute_once() + + assert result is False + assert mock_session.commit.call_count == 1 + + @pytest.mark.asyncio + async def test_claim_and_execute_once_executes_job_successfully(self): + """ + Тест успешного выполнения задачи. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=5) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.QueueRepository") as mock_repo_cls: + + mock_session = AsyncMock() + mock_sm = MagicMock() + mock_sm.return_value.__aenter__.return_value = mock_session + mock_sm.return_value.__aexit__.return_value = AsyncMock() + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = mock_sm + + mock_repo = Mock() + mock_repo.claim_one = AsyncMock(return_value={ + "job_id": "test-job-id", + "lease_ttl_sec": 60, + "task": "test.task", + "args": {"key": "value"} + }) + mock_repo.finish_ok = AsyncMock() + mock_repo_cls.return_value = mock_repo + + worker = PGWorker(cfg, stop_event) + + async def mock_pipeline(task, args): + yield + + with patch.object(worker, "_pipeline", side_effect=mock_pipeline), \ + patch.object(worker, "_execute_with_heartbeat", return_value=False): + result = await worker._claim_and_execute_once() + + assert result is True + assert mock_repo.finish_ok.call_count == 1 + + @pytest.mark.asyncio + async def test_claim_and_execute_once_handles_cancellation(self): + """ + Тест обработки отмены задачи пользователем. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=5) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.QueueRepository") as mock_repo_cls: + + mock_session = AsyncMock() + mock_sm = MagicMock() + mock_sm.return_value.__aenter__.return_value = mock_session + mock_sm.return_value.__aexit__.return_value = AsyncMock() + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = mock_sm + + mock_repo = Mock() + mock_repo.claim_one = AsyncMock(return_value={ + "job_id": "test-job-id", + "lease_ttl_sec": 60, + "task": "test.task", + "args": {} + }) + mock_repo.finish_fail_or_retry = AsyncMock() + mock_repo_cls.return_value = mock_repo + + worker = PGWorker(cfg, stop_event) + + with patch.object(worker, "_execute_with_heartbeat", return_value=True): + result = await worker._claim_and_execute_once() + + assert result is True + mock_repo.finish_fail_or_retry.assert_called_once() + args = mock_repo.finish_fail_or_retry.call_args + assert "canceled by user" in args[0] + + @pytest.mark.asyncio + async def test_claim_and_execute_once_handles_exceptions(self): + """ + Тест обработки исключений при выполнении задачи. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=5) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.QueueRepository") as mock_repo_cls: + + mock_session = AsyncMock() + mock_sm = MagicMock() + mock_sm.return_value.__aenter__.return_value = mock_session + mock_sm.return_value.__aexit__.return_value = AsyncMock() + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = mock_sm + + mock_repo = Mock() + mock_repo.claim_one = AsyncMock(return_value={ + "job_id": "test-job-id", + "lease_ttl_sec": 60, + "task": "test.task", + "args": {} + }) + mock_repo.finish_fail_or_retry = AsyncMock() + mock_repo_cls.return_value = mock_repo + + worker = PGWorker(cfg, stop_event) + + with patch.object(worker, "_execute_with_heartbeat", side_effect=ValueError("Test error")): + result = await worker._claim_and_execute_once() + + assert result is True + mock_repo.finish_fail_or_retry.assert_called_once() + args = mock_repo.finish_fail_or_retry.call_args + assert "Test error" in args[0] + + @pytest.mark.asyncio + async def test_execute_with_heartbeat_sends_heartbeats(self): + """ + Тест отправки heartbeat'ов во время выполнения задачи. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=1, claim_backoff_sec=5) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.QueueRepository") as mock_repo_cls: + + mock_session = AsyncMock() + mock_sm = MagicMock() + mock_sm.return_value.__aenter__.return_value = mock_session + mock_sm.return_value.__aexit__.return_value = AsyncMock() + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = mock_sm + + mock_repo = Mock() + mock_repo.heartbeat = AsyncMock(return_value=(True, False)) + mock_repo_cls.return_value = mock_repo + + worker = PGWorker(cfg, stop_event) + + async def slow_pipeline(): + await asyncio.sleep(0.5) + yield + await asyncio.sleep(0.6) + yield + + canceled = await worker._execute_with_heartbeat("job-id", 60, slow_pipeline()) + + assert canceled is False + assert mock_repo.heartbeat.call_count >= 1 + + @pytest.mark.asyncio + async def test_execute_with_heartbeat_detects_cancellation(self): + """ + Тест обнаружения отмены через heartbeat. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=1, claim_backoff_sec=5) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.QueueRepository") as mock_repo_cls: + + mock_session = AsyncMock() + mock_sm = MagicMock() + mock_sm.return_value.__aenter__.return_value = mock_session + mock_sm.return_value.__aexit__.return_value = AsyncMock() + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = mock_sm + + mock_repo = Mock() + mock_repo.heartbeat = AsyncMock(return_value=(True, True)) + mock_repo_cls.return_value = mock_repo + + worker = PGWorker(cfg, stop_event) + + async def slow_pipeline(): + await asyncio.sleep(0.5) + yield + await asyncio.sleep(0.6) + yield + + canceled = await worker._execute_with_heartbeat("job-id", 60, slow_pipeline()) + + assert canceled is True + + @pytest.mark.asyncio + async def test_pipeline_handles_sync_function(self): + """ + Тест выполнения синхронной функции-пайплайна. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=5) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.resolve_pipeline") as mock_resolve: + + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = Mock() + + def sync_pipeline(args): + return "result" + + mock_resolve.return_value = sync_pipeline + + worker = PGWorker(cfg, stop_event) + + results = [] + async for _ in worker._pipeline("test.task", {}): + results.append(_) + + assert len(results) == 1 + + @pytest.mark.asyncio + async def test_pipeline_handles_async_function(self): + """ + Тест выполнения асинхронной функции-пайплайна. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=5) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.resolve_pipeline") as mock_resolve: + + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = Mock() + + async def async_pipeline(args): + return "result" + + mock_resolve.return_value = async_pipeline + + worker = PGWorker(cfg, stop_event) + + results = [] + async for _ in worker._pipeline("test.task", {}): + results.append(_) + + assert len(results) == 1 + + @pytest.mark.asyncio + async def test_pipeline_handles_async_generator(self): + """ + Тест выполнения асинхронного генератора-пайплайна. + """ + cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=5) + stop_event = asyncio.Event() + + with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.base.resolve_pipeline") as mock_resolve: + + mock_ctx.get_logger.return_value = Mock() + mock_ctx.sessionmaker = Mock() + + async def async_gen_pipeline(args): + yield + yield + yield + + mock_resolve.return_value = async_gen_pipeline + + worker = PGWorker(cfg, stop_event) + + results = [] + async for _ in worker._pipeline("test.task", {}): + results.append(_) + + assert len(results) == 3 diff --git a/tests/unit/test_workers_manager.py b/tests/unit/test_workers_manager.py new file mode 100644 index 0000000..c8c5485 --- /dev/null +++ b/tests/unit/test_workers_manager.py @@ -0,0 +1,278 @@ +# tests/unit/test_workers_manager.py +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock, Mock, patch +import pytest + +from dataloader.workers.manager import WorkerManager, WorkerSpec, build_manager_from_env + + +@pytest.mark.unit +class TestWorkerManager: + """ + Unit тесты для WorkerManager. + """ + + def test_init_creates_manager_with_specs(self): + """ + Тест создания менеджера со спецификациями воркеров. + """ + specs = [WorkerSpec(queue="test_queue", concurrency=2)] + + with patch("dataloader.workers.manager.APP_CTX") as mock_ctx: + mock_ctx.get_logger.return_value = Mock() + manager = WorkerManager(specs) + + assert manager._specs == specs + assert manager._stop.is_set() is False + assert manager._tasks == [] + assert manager._reaper_task is None + + @pytest.mark.asyncio + async def test_start_creates_worker_tasks(self): + """ + Тест старта воркеров и создания задач. + """ + specs = [ + WorkerSpec(queue="queue1", concurrency=2), + WorkerSpec(queue="queue2", concurrency=1), + ] + + with patch("dataloader.workers.manager.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, \ + patch("dataloader.workers.manager.PGWorker") as mock_worker_cls: + + mock_ctx.get_logger.return_value = Mock() + mock_cfg.worker.heartbeat_sec = 10 + mock_cfg.worker.claim_backoff_sec = 5 + + mock_worker_instance = Mock() + mock_worker_instance.run = AsyncMock() + mock_worker_cls.return_value = mock_worker_instance + + manager = WorkerManager(specs) + await manager.start() + + assert len(manager._tasks) == 3 + assert manager._reaper_task is not None + assert mock_worker_cls.call_count == 3 + + await manager.stop() + + @pytest.mark.asyncio + async def test_start_with_zero_concurrency_creates_one_worker(self): + """ + Тест, что concurrency=0 создаёт минимум 1 воркер. + """ + specs = [WorkerSpec(queue="test", concurrency=0)] + + with patch("dataloader.workers.manager.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, \ + patch("dataloader.workers.manager.PGWorker") as mock_worker_cls: + + mock_ctx.get_logger.return_value = Mock() + mock_cfg.worker.heartbeat_sec = 10 + mock_cfg.worker.claim_backoff_sec = 5 + + mock_worker_instance = Mock() + mock_worker_instance.run = AsyncMock() + mock_worker_cls.return_value = mock_worker_instance + + manager = WorkerManager(specs) + await manager.start() + + assert len(manager._tasks) == 1 + + await manager.stop() + + @pytest.mark.asyncio + async def test_stop_cancels_all_tasks(self): + """ + Тест остановки всех задач воркеров. + """ + specs = [WorkerSpec(queue="test", concurrency=2)] + + with patch("dataloader.workers.manager.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, \ + patch("dataloader.workers.manager.PGWorker") as mock_worker_cls: + + mock_ctx.get_logger.return_value = Mock() + mock_cfg.worker.heartbeat_sec = 10 + mock_cfg.worker.claim_backoff_sec = 5 + + mock_worker_instance = Mock() + mock_worker_instance.run = AsyncMock() + mock_worker_cls.return_value = mock_worker_instance + + manager = WorkerManager(specs) + await manager.start() + + initial_task_count = len(manager._tasks) + + await manager.stop() + + assert manager._stop.is_set() + assert len(manager._tasks) == 0 + assert manager._reaper_task is None + + @pytest.mark.asyncio + async def test_reaper_loop_calls_requeue_lost(self): + """ + Тест, что реапер вызывает requeue_lost. + """ + specs = [WorkerSpec(queue="test", concurrency=1)] + + with patch("dataloader.workers.manager.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, \ + patch("dataloader.workers.manager.PGWorker") as mock_worker_cls, \ + patch("dataloader.workers.manager.requeue_lost") as mock_requeue: + + mock_logger = Mock() + mock_ctx.get_logger.return_value = mock_logger + mock_cfg.worker.heartbeat_sec = 10 + mock_cfg.worker.claim_backoff_sec = 5 + mock_cfg.worker.reaper_period_sec = 1 + + mock_session = AsyncMock() + mock_sm = MagicMock() + mock_sm.return_value.__aenter__.return_value = mock_session + mock_ctx.sessionmaker = mock_sm + + mock_worker_instance = Mock() + mock_worker_instance.run = AsyncMock() + mock_worker_cls.return_value = mock_worker_instance + + mock_requeue.return_value = ["job1", "job2"] + + manager = WorkerManager(specs) + await manager.start() + + await asyncio.sleep(1.5) + + await manager.stop() + + assert mock_requeue.call_count >= 1 + + @pytest.mark.asyncio + async def test_reaper_loop_handles_exceptions(self): + """ + Тест, что реапер обрабатывает исключения и продолжает работу. + """ + specs = [WorkerSpec(queue="test", concurrency=1)] + + with patch("dataloader.workers.manager.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg, \ + patch("dataloader.workers.manager.PGWorker") as mock_worker_cls, \ + patch("dataloader.workers.manager.requeue_lost") as mock_requeue: + + mock_logger = Mock() + mock_ctx.get_logger.return_value = mock_logger + mock_cfg.worker.heartbeat_sec = 10 + mock_cfg.worker.claim_backoff_sec = 5 + mock_cfg.worker.reaper_period_sec = 0.5 + + mock_session = AsyncMock() + mock_sm = MagicMock() + mock_sm.return_value.__aenter__.return_value = mock_session + mock_ctx.sessionmaker = mock_sm + + mock_worker_instance = Mock() + mock_worker_instance.run = AsyncMock() + mock_worker_cls.return_value = mock_worker_instance + + mock_requeue.side_effect = [Exception("DB error"), []] + + manager = WorkerManager(specs) + await manager.start() + + await asyncio.sleep(1.2) + + await manager.stop() + + assert mock_logger.exception.call_count >= 1 + + +@pytest.mark.unit +class TestBuildManagerFromEnv: + """ + Unit тесты для build_manager_from_env. + """ + + def test_builds_manager_from_config(self): + """ + Тест создания менеджера из конфигурации. + """ + with patch("dataloader.workers.manager.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg: + + mock_ctx.get_logger.return_value = Mock() + mock_cfg.worker.parsed_workers.return_value = [ + {"queue": "queue1", "concurrency": 2}, + {"queue": "queue2", "concurrency": 3}, + ] + + manager = build_manager_from_env() + + assert len(manager._specs) == 2 + assert manager._specs[0].queue == "queue1" + assert manager._specs[0].concurrency == 2 + assert manager._specs[1].queue == "queue2" + assert manager._specs[1].concurrency == 3 + + def test_skips_empty_queue_names(self): + """ + Тест, что пустые имена очередей пропускаются. + """ + with patch("dataloader.workers.manager.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg: + + mock_ctx.get_logger.return_value = Mock() + mock_cfg.worker.parsed_workers.return_value = [ + {"queue": "", "concurrency": 2}, + {"queue": "valid_queue", "concurrency": 1}, + {"queue": " ", "concurrency": 3}, + ] + + manager = build_manager_from_env() + + assert len(manager._specs) == 1 + assert manager._specs[0].queue == "valid_queue" + + def test_handles_missing_fields_with_defaults(self): + """ + Тест обработки отсутствующих полей с дефолтными значениями. + """ + with patch("dataloader.workers.manager.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg: + + mock_ctx.get_logger.return_value = Mock() + mock_cfg.worker.parsed_workers.return_value = [ + {"queue": "test"}, + {"queue": "test2", "concurrency": 0}, + ] + + manager = build_manager_from_env() + + assert len(manager._specs) == 2 + assert manager._specs[0].concurrency == 1 + assert manager._specs[1].concurrency == 1 + + def test_ensures_minimum_concurrency_of_one(self): + """ + Тест, что concurrency всегда минимум 1. + """ + with patch("dataloader.workers.manager.APP_CTX") as mock_ctx, \ + patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg: + + mock_ctx.get_logger.return_value = Mock() + mock_cfg.worker.parsed_workers.return_value = [ + {"queue": "test1", "concurrency": 0}, + {"queue": "test2", "concurrency": -5}, + ] + + manager = build_manager_from_env() + + assert len(manager._specs) == 2 + assert manager._specs[0].concurrency == 1 + assert manager._specs[1].concurrency == 1