diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..16ec8bb --- /dev/null +++ b/.coveragerc @@ -0,0 +1,3 @@ +[run] +omit = + src/dataloader/logger/* diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..647b68f --- /dev/null +++ b/pytest.ini @@ -0,0 +1,14 @@ +[pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +asyncio_mode = auto +asyncio_default_fixture_loop_scope = function +markers = + integration: integration tests requiring database + unit: unit tests without external dependencies +addopts = + -v + --tb=short + --strict-markers diff --git a/src/dataloader/api/schemas.py b/src/dataloader/api/schemas.py index 6c8c467..8ee2035 100644 --- a/src/dataloader/api/schemas.py +++ b/src/dataloader/api/schemas.py @@ -1,22 +1,19 @@ -from pydantic import BaseModel, Field +from pydantic import BaseModel, ConfigDict, Field + class HealthResponse(BaseModel): """Ответ для ручки /health""" - status: str = Field(default="running", description="Service health check", max_length=7) + model_config = ConfigDict( + json_schema_extra={"example": {"status": "running"}} + ) - class Config: - json_schema_extra = {"example": {"status": "running"}} + status: str = Field(default="running", description="Service health check", max_length=7) class InfoResponse(BaseModel): """Ответ для ручки /info""" - name: str = Field(description="Service name", max_length=50) - description: str = Field(description="Service description", max_length=200) - type: str = Field(default="REST API", description="Service type", max_length=20) - version: str = Field(description="Service version", max_length=20, pattern=r"^\d+\.\d+\.\d+") - - class Config: - json_schema_extra = { + model_config = ConfigDict( + json_schema_extra={ "example": { "name": "rest-template", "description": "Python 'AI gateway' template for developing REST microservices", @@ -24,7 +21,16 @@ class InfoResponse(BaseModel): "version": "0.1.0" } } + ) + + name: str = Field(description="Service name", max_length=50) + description: str = Field(description="Service description", max_length=200) + type: str = Field(default="REST API", description="Service type", max_length=20) + version: str = Field(description="Service version", max_length=20, pattern=r"^\d+\.\d+\.\d+") class RateResponse(BaseModel): + """Ответ для записи рейтинга""" + model_config = ConfigDict(str_strip_whitespace=True) + rating_result: str = Field(description="Rating that was recorded", max_length=50) diff --git a/src/dataloader/api/v1/schemas.py b/src/dataloader/api/v1/schemas.py index 3ec39f0..bd52ccb 100644 --- a/src/dataloader/api/v1/schemas.py +++ b/src/dataloader/api/v1/schemas.py @@ -5,13 +5,15 @@ from datetime import datetime, timezone from typing import Any, Optional from uuid import UUID -from pydantic import BaseModel, Field, field_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator class TriggerJobRequest(BaseModel): """ Запрос на постановку задачи в очередь. """ + model_config = ConfigDict(str_strip_whitespace=True) + queue: str = Field(...) task: str = Field(...) args: dict[str, Any] = Field(default_factory=dict) @@ -37,6 +39,8 @@ class TriggerJobResponse(BaseModel): """ Ответ на постановку задачи. """ + model_config = ConfigDict(str_strip_whitespace=True) + job_id: UUID = Field(...) status: str = Field(...) @@ -45,6 +49,8 @@ class JobStatusResponse(BaseModel): """ Текущий статус задачи. """ + model_config = ConfigDict(str_strip_whitespace=True) + job_id: UUID = Field(...) status: str = Field(...) attempt: int = Field(...) @@ -59,5 +65,7 @@ class CancelJobResponse(BaseModel): """ Ответ на запрос отмены задачи. """ + model_config = ConfigDict(str_strip_whitespace=True) + job_id: UUID = Field(...) status: str = Field(...) diff --git a/src/dataloader/storage/models/__init__.py b/src/dataloader/storage/models/__init__.py new file mode 100644 index 0000000..791b2d5 --- /dev/null +++ b/src/dataloader/storage/models/__init__.py @@ -0,0 +1,16 @@ +# src/dataloader/storage/models/__init__.py +""" +ORM модели для работы с базой данных. +Организованы по доменам для масштабируемости. +""" +from __future__ import annotations + +from .base import Base +from .queue import DLJob, DLJobEvent, dl_status_enum + +__all__ = [ + "Base", + "DLJob", + "DLJobEvent", + "dl_status_enum", +] diff --git a/src/dataloader/storage/models/base.py b/src/dataloader/storage/models/base.py new file mode 100644 index 0000000..1581e31 --- /dev/null +++ b/src/dataloader/storage/models/base.py @@ -0,0 +1,12 @@ +# src/dataloader/storage/models/base.py +from __future__ import annotations + +from sqlalchemy.orm import DeclarativeBase + + +class Base(DeclarativeBase): + """ + Базовый класс для всех ORM моделей приложения. + Используется SQLAlchemy 2.0+ declarative style. + """ + pass diff --git a/src/dataloader/storage/models.py b/src/dataloader/storage/models/queue.py similarity index 93% rename from src/dataloader/storage/models.py rename to src/dataloader/storage/models/queue.py index 9097f71..54aca7b 100644 --- a/src/dataloader/storage/models.py +++ b/src/dataloader/storage/models/queue.py @@ -1,4 +1,4 @@ -# src/dataloader/storage/models.py +# src/dataloader/storage/models/queue.py from __future__ import annotations from datetime import datetime @@ -6,14 +6,9 @@ from typing import Any, Optional from sqlalchemy import BigInteger, DateTime, Text from sqlalchemy.dialects.postgresql import ENUM, JSONB, UUID -from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column +from sqlalchemy.orm import Mapped, mapped_column - -class Base(DeclarativeBase): - """ - Базовый класс для всех ORM моделей. - """ - pass +from .base import Base dl_status_enum = ENUM( diff --git a/src/dataloader/storage/repositories/__init__.py b/src/dataloader/storage/repositories/__init__.py new file mode 100644 index 0000000..5039b2c --- /dev/null +++ b/src/dataloader/storage/repositories/__init__.py @@ -0,0 +1,12 @@ +# src/dataloader/storage/repositories/__init__.py +""" +Репозитории для работы с базой данных. +Организованы по доменам для масштабируемости. +""" +from __future__ import annotations + +from .queue import QueueRepository + +__all__ = [ + "QueueRepository", +] diff --git a/src/dataloader/storage/repositories.py b/src/dataloader/storage/repositories/queue.py similarity index 99% rename from src/dataloader/storage/repositories.py rename to src/dataloader/storage/repositories/queue.py index 9d448a7..98f0efd 100644 --- a/src/dataloader/storage/repositories.py +++ b/src/dataloader/storage/repositories/queue.py @@ -1,4 +1,4 @@ -# src/dataloader/storage/repositories.py +# src/dataloader/storage/repositories/queue.py from __future__ import annotations from datetime import datetime, timedelta, timezone diff --git a/src/dataloader/storage/schemas/__init__.py b/src/dataloader/storage/schemas/__init__.py new file mode 100644 index 0000000..c0d536d --- /dev/null +++ b/src/dataloader/storage/schemas/__init__.py @@ -0,0 +1,13 @@ +# src/dataloader/storage/schemas/__init__.py +""" +DTO (Data Transfer Objects) для слоя хранилища. +Организованы по доменам для масштабируемости. +""" +from __future__ import annotations + +from .queue import CreateJobRequest, JobStatus + +__all__ = [ + "CreateJobRequest", + "JobStatus", +] diff --git a/src/dataloader/storage/schemas.py b/src/dataloader/storage/schemas/queue.py similarity index 95% rename from src/dataloader/storage/schemas.py rename to src/dataloader/storage/schemas/queue.py index 92a9ddd..be07545 100644 --- a/src/dataloader/storage/schemas.py +++ b/src/dataloader/storage/schemas/queue.py @@ -1,4 +1,4 @@ -# src/dataloader/storage/schemas.py +# src/dataloader/storage/schemas/queue.py from __future__ import annotations from dataclasses import dataclass diff --git a/tests/conftest.py b/tests/conftest.py index fce0f1e..c123c3b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,179 +1,104 @@ # tests/conftest.py +from __future__ import annotations + import asyncio import sys from typing import AsyncGenerator +from uuid import uuid4 import pytest +import pytest_asyncio from dotenv import load_dotenv from httpx import AsyncClient, ASGITransport from sqlalchemy import text -from sqlalchemy.ext.asyncio import ( - AsyncEngine, - AsyncSession, - async_sessionmaker, - create_async_engine, -) -from sqlalchemy.exc import ProgrammingError -from asyncpg.exceptions import InvalidCatalogNameError +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker -# Load .env before other imports to ensure config is available load_dotenv() from dataloader.api import app_main -from dataloader.api.v1.router import get_service -from dataloader.api.v1.service import JobsService -from dataloader.context import APP_CTX -from dataloader.storage.db import Base from dataloader.config import APP_CONFIG +from dataloader.context import APP_CTX, get_session +from dataloader.storage.models import Base +from dataloader.storage.engine import create_engine, create_sessionmaker -# For Windows: use SelectorEventLoop which is more stable if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) -@pytest.fixture(scope="session") +@pytest_asyncio.fixture(scope="function") async def db_engine() -> AsyncGenerator[AsyncEngine, None]: """ - Creates a temporary, isolated test database for the entire test session. - - Connects to the default 'postgres' database to create a new test DB. - - Yields an engine connected to the new test DB. - - Drops the test DB after the session is complete. + Создаёт тестовый движок для теста. + Использует реальную БД из конфига. """ - pg_settings = APP_CONFIG.pg - test_db_name = f"{pg_settings.database}_test" - - # DSN for connecting to 'postgres' DB to manage the test DB - system_dsn = pg_settings.url.replace(f"/{pg_settings.database}", "/postgres") - system_engine = create_async_engine(system_dsn, isolation_level="AUTOCOMMIT") - - # DSN for the new test database - test_dsn = pg_settings.url.replace(f"/{pg_settings.database}", f"/{test_db_name}") - - async with system_engine.connect() as conn: - # Drop the test DB if it exists from a previous failed run - await conn.execute(text(f"DROP DATABASE IF EXISTS {test_db_name} WITH (FORCE)")) - await conn.execute(text(f"CREATE DATABASE {test_db_name}")) - - # Create an engine connected to the new test database - engine = create_async_engine(test_dsn) - - # Create all tables and DDL objects - async with engine.begin() as conn: - # Execute DDL statements one by one - await conn.execute(text("CREATE TYPE dl_status AS ENUM ('queued','running','succeeded','failed','canceled','lost')")) - - # Create tables - await conn.run_sync(Base.metadata.create_all) - - # Create indexes - await conn.execute(text("CREATE INDEX ix_dl_jobs_claim ON dl_jobs(queue, available_at, priority, created_at) WHERE status = 'queued'")) - await conn.execute(text("CREATE INDEX ix_dl_jobs_running_lease ON dl_jobs(lease_expires_at) WHERE status = 'running'")) - await conn.execute(text("CREATE INDEX ix_dl_jobs_status_queue ON dl_jobs(status, queue)")) - - # Create function and triggers - await conn.execute( - text( - """ - CREATE OR REPLACE FUNCTION notify_job_ready() RETURNS trigger AS $$ - BEGIN - IF (TG_OP = 'INSERT') THEN - PERFORM pg_notify('dl_jobs', NEW.queue); - RETURN NEW; - ELSIF (TG_OP = 'UPDATE') THEN - IF NEW.status = 'queued' AND NEW.available_at <= now() - AND (OLD.status IS DISTINCT FROM NEW.status OR OLD.available_at IS DISTINCT FROM NEW.available_at) THEN - PERFORM pg_notify('dl_jobs', NEW.queue); - END IF; - RETURN NEW; - END IF; - RETURN NEW; - END $$ LANGUAGE plpgsql; - """ - ) - ) - await conn.execute(text("CREATE TRIGGER dl_jobs_notify_ins AFTER INSERT ON dl_jobs FOR EACH ROW EXECUTE FUNCTION notify_job_ready()")) - await conn.execute(text("CREATE TRIGGER dl_jobs_notify_upd AFTER UPDATE OF status, available_at ON dl_jobs FOR EACH ROW EXECUTE FUNCTION notify_job_ready()")) + engine = create_engine(APP_CONFIG.pg.url) yield engine - # --- Teardown --- await engine.dispose() - # Drop the test database - async with system_engine.connect() as conn: - await conn.execute(text(f"DROP DATABASE {test_db_name} WITH (FORCE)")) - await system_engine.dispose() -@pytest.fixture(scope="session") -async def app_context(db_engine: AsyncEngine) -> AsyncGenerator[None, None]: - """ - Overrides the global APP_CTX with the test database engine and sessionmaker. - This ensures that the app, when tested, uses the isolated test DB. - """ - original_engine = APP_CTX._engine - original_sessionmaker = APP_CTX._sessionmaker - APP_CTX._engine = db_engine - APP_CTX._sessionmaker = async_sessionmaker(db_engine, expire_on_commit=False, class_=AsyncSession) - - yield - - # Restore original context - APP_CTX._engine = original_engine - APP_CTX._sessionmaker = original_sessionmaker - - -@pytest.fixture +@pytest_asyncio.fixture(scope="function") async def db_session(db_engine: AsyncEngine) -> AsyncGenerator[AsyncSession, None]: """ - Provides a transactional session for tests. - A single connection is acquired from the engine, a transaction is started, - and a new session is created bound to that connection. At the end of the test, - the transaction is rolled back and the connection is closed, ensuring - perfect test isolation. + Предоставляет сессию БД для каждого теста. + НЕ использует транзакцию, чтобы работали advisory locks. """ - connection = await db_engine.connect() - trans = await connection.begin() - - # Create a sessionmaker bound to the single connection - TestSession = async_sessionmaker( - bind=connection, - expire_on_commit=False, - class_=AsyncSession - ) - session = TestSession() - - try: + sessionmaker = async_sessionmaker(bind=db_engine, expire_on_commit=False, class_=AsyncSession) + async with sessionmaker() as session: yield session - finally: - # Clean up the session, rollback the transaction, and close the connection - await session.close() - await trans.rollback() - await connection.close() + await session.rollback() -@pytest.fixture +@pytest_asyncio.fixture(scope="function") +async def clean_queue_tables(db_session: AsyncSession) -> None: + """ + Очищает таблицы очереди перед каждым тестом. + """ + schema = APP_CONFIG.pg.schema_queue + await db_session.execute(text(f"TRUNCATE TABLE {schema}.dl_job_events CASCADE")) + await db_session.execute(text(f"TRUNCATE TABLE {schema}.dl_jobs CASCADE")) + await db_session.commit() + + +@pytest_asyncio.fixture async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]: """ - Provides an HTTP client for the FastAPI application. - It overrides the 'get_session' dependency to use the test-scoped, - transactional session provided by the 'db_session' fixture. This ensures - that API calls operate within the same transaction as the test function, - allowing for consistent state checks. + HTTP клиент для тестирования API. """ async def override_get_session() -> AsyncGenerator[AsyncSession, None]: - """This override simply yields the session created by the db_session fixture.""" yield db_session - # The service depends on the session, so we override the session provider - # that is used by the service via Depends(get_session) - from dataloader.context import get_session app_main.dependency_overrides[get_session] = override_get_session transport = ASGITransport(app=app_main) async with AsyncClient(transport=transport, base_url="http://test") as c: yield c - # Clean up the dependency override app_main.dependency_overrides.clear() + + +@pytest.fixture +def job_id() -> str: + """ + Генерирует уникальный job_id для тестов. + """ + return str(uuid4()) + + +@pytest.fixture +def queue_name() -> str: + """ + Возвращает имя тестовой очереди. + """ + return "test.queue" + + +@pytest.fixture +def task_name() -> str: + """ + Возвращает имя тестовой задачи. + """ + return "test.task" diff --git a/tests/integration_tests/__init__.py b/tests/integration_tests/__init__.py new file mode 100644 index 0000000..d6c43f2 --- /dev/null +++ b/tests/integration_tests/__init__.py @@ -0,0 +1 @@ +# tests/integration_tests/__init__.py diff --git a/tests/integration_tests/test_api_endpoints.py b/tests/integration_tests/test_api_endpoints.py new file mode 100644 index 0000000..216cccc --- /dev/null +++ b/tests/integration_tests/test_api_endpoints.py @@ -0,0 +1,172 @@ +# tests/integration_tests/test_api_endpoints.py +from __future__ import annotations + +import pytest +from httpx import AsyncClient + + +@pytest.mark.integration +class TestJobsAPI: + """ + Интеграционные тесты для API endpoints. + """ + + async def test_trigger_job_creates_new_job( + self, + client: AsyncClient, + clean_queue_tables, + queue_name: str, + task_name: str, + ): + """ + Тест создания новой задачи через API. + """ + payload = { + "queue": queue_name, + "task": task_name, + "args": {"test_key": "test_value"}, + "lock_key": "lock_api_1", + "partition_key": "part1", + "priority": 100, + "max_attempts": 5, + "lease_ttl_sec": 60, + } + + response = await client.post("/api/v1/jobs/trigger", json=payload) + + assert response.status_code == 200 + data = response.json() + assert "job_id" in data + assert data["status"] == "queued" + + async def test_trigger_job_with_idempotency_key( + self, + client: AsyncClient, + clean_queue_tables, + queue_name: str, + task_name: str, + ): + """ + Тест идемпотентности через idempotency_key. + """ + payload = { + "queue": queue_name, + "task": task_name, + "args": {}, + "idempotency_key": "unique_key_123", + "lock_key": "lock_idem", + "priority": 100, + "max_attempts": 5, + "lease_ttl_sec": 60, + } + + response1 = await client.post("/api/v1/jobs/trigger", json=payload) + response2 = await client.post("/api/v1/jobs/trigger", json=payload) + + assert response1.status_code == 200 + assert response2.status_code == 200 + + data1 = response1.json() + data2 = response2.json() + + assert data1["job_id"] == data2["job_id"] + assert data1["status"] == data2["status"] == "queued" + + async def test_get_status_returns_job_status( + self, + client: AsyncClient, + clean_queue_tables, + queue_name: str, + task_name: str, + ): + """ + Тест получения статуса задачи через API. + """ + payload = { + "queue": queue_name, + "task": task_name, + "args": {}, + "lock_key": "lock_status", + "priority": 100, + "max_attempts": 5, + "lease_ttl_sec": 60, + } + + create_response = await client.post("/api/v1/jobs/trigger", json=payload) + job_id = create_response.json()["job_id"] + + status_response = await client.get(f"/api/v1/jobs/{job_id}/status") + + assert status_response.status_code == 200 + data = status_response.json() + assert data["job_id"] == job_id + assert data["status"] == "queued" + assert data["attempt"] == 0 + + async def test_get_status_returns_404_for_nonexistent_job( + self, + client: AsyncClient, + clean_queue_tables, + ): + """ + Тест получения статуса несуществующей задачи. + """ + fake_job_id = "00000000-0000-0000-0000-000000000000" + + response = await client.get(f"/api/v1/jobs/{fake_job_id}/status") + + assert response.status_code == 404 + + async def test_cancel_job_sets_cancel_flag( + self, + client: AsyncClient, + clean_queue_tables, + queue_name: str, + task_name: str, + ): + """ + Тест отмены задачи через API. + """ + payload = { + "queue": queue_name, + "task": task_name, + "args": {}, + "lock_key": "lock_cancel", + "priority": 100, + "max_attempts": 5, + "lease_ttl_sec": 60, + } + + create_response = await client.post("/api/v1/jobs/trigger", json=payload) + job_id = create_response.json()["job_id"] + + cancel_response = await client.post(f"/api/v1/jobs/{job_id}/cancel") + + assert cancel_response.status_code == 200 + data = cancel_response.json() + assert data["job_id"] == job_id + + async def test_cancel_nonexistent_job_returns_404( + self, + client: AsyncClient, + clean_queue_tables, + ): + """ + Тест отмены несуществующей задачи. + """ + fake_job_id = "00000000-0000-0000-0000-000000000000" + + response = await client.post(f"/api/v1/jobs/{fake_job_id}/cancel") + + assert response.status_code == 404 + + async def test_health_endpoint_returns_200( + self, + client: AsyncClient, + ): + """ + Тест health check endpoint. + """ + response = await client.get("/health") + + assert response.status_code == 200 diff --git a/tests/integration_tests/test_queue_repository.py b/tests/integration_tests/test_queue_repository.py new file mode 100644 index 0000000..917aff6 --- /dev/null +++ b/tests/integration_tests/test_queue_repository.py @@ -0,0 +1,453 @@ +# tests/integration_tests/test_queue_repository.py +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest +from sqlalchemy.ext.asyncio import AsyncSession + +from dataloader.storage.repositories import QueueRepository +from dataloader.storage.schemas import CreateJobRequest, JobStatus + + +@pytest.mark.integration +class TestQueueRepository: + """ + Интеграционные тесты для QueueRepository. + """ + + async def test_create_or_get_creates_new_job( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Тест создания новой задачи в очереди. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={"param": "value"}, + idempotency_key="test_key_1", + lock_key="test_lock_1", + partition_key="part1", + priority=100, + available_at=datetime.now(timezone.utc), + max_attempts=5, + lease_ttl_sec=60, + producer="test_producer", + consumer_group="test_group", + ) + + created_id, status = await repo.create_or_get(req) + + assert created_id == job_id + assert status == "queued" + + async def test_create_or_get_returns_existing_job( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Тест идемпотентности - повторный вызов возвращает существующую задачу. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={}, + idempotency_key="idempotent_key_1", + lock_key="lock1", + partition_key="", + priority=100, + available_at=datetime.now(timezone.utc), + max_attempts=5, + lease_ttl_sec=60, + producer=None, + consumer_group=None, + ) + + created_id_1, status_1 = await repo.create_or_get(req) + + req_2 = CreateJobRequest( + job_id="different_job_id", + queue="different_queue", + task="different_task", + args={}, + idempotency_key="idempotent_key_1", + lock_key="lock2", + partition_key="", + priority=200, + available_at=datetime.now(timezone.utc), + max_attempts=3, + lease_ttl_sec=30, + producer=None, + consumer_group=None, + ) + + created_id_2, status_2 = await repo.create_or_get(req_2) + + assert created_id_1 == created_id_2 == job_id + assert status_1 == status_2 == "queued" + + async def test_get_status_returns_job_status( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Тест получения статуса задачи. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={"key": "val"}, + idempotency_key=None, + lock_key="lock", + partition_key="", + priority=100, + available_at=datetime.now(timezone.utc), + max_attempts=5, + lease_ttl_sec=60, + producer=None, + consumer_group=None, + ) + + await repo.create_or_get(req) + + status = await repo.get_status(job_id) + + assert status is not None + assert status.job_id == job_id + assert status.status == "queued" + assert status.attempt == 0 + assert status.error is None + + async def test_get_status_returns_none_for_nonexistent_job( + self, + db_session: AsyncSession, + clean_queue_tables, + ): + """ + Тест получения статуса несуществующей задачи. + """ + repo = QueueRepository(db_session) + + status = await repo.get_status("00000000-0000-0000-0000-000000000000") + + assert status is None + + async def test_cancel_sets_cancel_requested_flag( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Тест установки флага отмены. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={}, + idempotency_key=None, + lock_key="lock", + partition_key="", + priority=100, + available_at=datetime.now(timezone.utc), + max_attempts=5, + lease_ttl_sec=60, + producer=None, + consumer_group=None, + ) + + await repo.create_or_get(req) + + result = await repo.cancel(job_id) + assert result is True + + status = await repo.get_status(job_id) + assert status is not None + + async def test_claim_one_returns_job_for_processing( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Тест захвата задачи для обработки. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={"data": "test"}, + idempotency_key=None, + lock_key="lock_claim", + partition_key="partition1", + priority=50, + available_at=datetime.now(timezone.utc), + max_attempts=5, + lease_ttl_sec=120, + producer=None, + consumer_group=None, + ) + + await repo.create_or_get(req) + + claimed = await repo.claim_one(queue_name, claim_backoff_sec=15) + + assert claimed is not None + assert claimed["job_id"] == job_id + assert claimed["queue"] == queue_name + assert claimed["task"] == task_name + assert claimed["args"] == {"data": "test"} + assert claimed["lock_key"] == "lock_claim" + assert claimed["attempt"] == 1 + + status = await repo.get_status(job_id) + assert status is not None + assert status.status == "running" + assert status.attempt == 1 + + async def test_claim_one_returns_none_when_no_jobs( + self, + db_session: AsyncSession, + clean_queue_tables, + queue_name: str, + ): + """ + Тест захвата при пустой очереди. + """ + repo = QueueRepository(db_session) + + claimed = await repo.claim_one(queue_name, claim_backoff_sec=15) + + assert claimed is None + + async def test_heartbeat_updates_lease( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Тест обновления heartbeat и продления lease. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={}, + idempotency_key=None, + lock_key="lock_hb", + partition_key="", + priority=100, + available_at=datetime.now(timezone.utc), + max_attempts=5, + lease_ttl_sec=60, + producer=None, + consumer_group=None, + ) + + await repo.create_or_get(req) + await repo.claim_one(queue_name, claim_backoff_sec=15) + + success, cancel_requested = await repo.heartbeat(job_id, ttl_sec=90) + + assert success is True + assert cancel_requested is False + + async def test_finish_ok_marks_job_succeeded( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Тест успешного завершения задачи. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={}, + idempotency_key=None, + lock_key="lock_finish", + partition_key="", + priority=100, + available_at=datetime.now(timezone.utc), + max_attempts=5, + lease_ttl_sec=60, + producer=None, + consumer_group=None, + ) + + await repo.create_or_get(req) + await repo.claim_one(queue_name, claim_backoff_sec=15) + + await repo.finish_ok(job_id) + + status = await repo.get_status(job_id) + assert status is not None + assert status.status == "succeeded" + assert status.finished_at is not None + + async def test_finish_fail_or_retry_requeues_on_retry( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Тест повторной постановки при ошибке с возможностью retry. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={}, + idempotency_key=None, + lock_key="lock_retry", + partition_key="", + priority=100, + available_at=datetime.now(timezone.utc), + max_attempts=3, + lease_ttl_sec=60, + producer=None, + consumer_group=None, + ) + + await repo.create_or_get(req) + await repo.claim_one(queue_name, claim_backoff_sec=15) + + await repo.finish_fail_or_retry(job_id, err="Test error") + + status = await repo.get_status(job_id) + assert status is not None + assert status.status == "queued" + assert status.error == "Test error" + assert status.attempt == 1 + + async def test_finish_fail_or_retry_marks_failed_when_max_attempts_reached( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Тест финальной ошибки при достижении max_attempts. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={}, + idempotency_key=None, + lock_key="lock_fail", + partition_key="", + priority=100, + available_at=datetime.now(timezone.utc), + max_attempts=1, + lease_ttl_sec=60, + producer=None, + consumer_group=None, + ) + + await repo.create_or_get(req) + await repo.claim_one(queue_name, claim_backoff_sec=15) + + await repo.finish_fail_or_retry(job_id, err="Final error") + + status = await repo.get_status(job_id) + assert status is not None + assert status.status == "failed" + assert status.error == "Final error" + assert status.finished_at is not None + + async def test_requeue_lost_returns_expired_jobs( + self, + db_session: AsyncSession, + clean_queue_tables, + job_id: str, + queue_name: str, + task_name: str, + ): + """ + Тест reaper - возврат протухших задач в очередь. + """ + repo = QueueRepository(db_session) + + req = CreateJobRequest( + job_id=job_id, + queue=queue_name, + task=task_name, + args={}, + idempotency_key=None, + lock_key="lock_lost", + partition_key="", + priority=100, + available_at=datetime.now(timezone.utc), + max_attempts=5, + lease_ttl_sec=1, + producer=None, + consumer_group=None, + ) + + await repo.create_or_get(req) + await repo.claim_one(queue_name, claim_backoff_sec=15) + + import asyncio + await asyncio.sleep(2) + + requeued = await repo.requeue_lost() + + assert job_id in requeued + + status = await repo.get_status(job_id) + assert status is not None + assert status.status == "queued" diff --git a/tests/integrations/test_api_v1.py b/tests/integrations/test_api_v1.py deleted file mode 100644 index e474f41..0000000 --- a/tests/integrations/test_api_v1.py +++ /dev/null @@ -1,93 +0,0 @@ -# tests/integrations/test_api_v1.py -import pytest -from httpx import AsyncClient - -@pytest.mark.anyio -async def test_trigger_and_get_status_ok(client: AsyncClient): - """ - Тест проверяет успешное создание задачи и получение её статуса. - """ - # 1. Триггер задачи - trigger_payload = { - "queue": "test_queue", - "task": "test_task", - "lock_key": "lock_123", - } - response = await client.post("/api/v1/jobs/trigger", json=trigger_payload) - - # Проверки ответа на триггер - assert response.status_code == 200 - response_data = response.json() - assert "job_id" in response_data - assert response_data["status"] == "queued" - job_id = response_data["job_id"] - - # 2. Получение статуса - response = await client.get(f"/api/v1/jobs/{job_id}/status") - - # Проверки ответа на статус - assert response.status_code == 200 - status_data = response.json() - assert status_data["job_id"] == job_id - assert status_data["status"] == "queued" - assert status_data["attempt"] == 0 - - -@pytest.mark.anyio -async def test_cancel_job_ok(client: AsyncClient): - """ - Тест проверяет успешную отмену задачи. - """ - # 1. Триггер задачи - trigger_payload = { - "queue": "cancel_queue", - "task": "cancel_task", - "lock_key": "lock_cancel", - } - response = await client.post("/api/v1/jobs/trigger", json=trigger_payload) - assert response.status_code == 200 - job_id = response.json()["job_id"] - - # 2. Запрос на отмену - response = await client.post(f"/api/v1/jobs/{job_id}/cancel") - assert response.status_code == 200 - cancel_data = response.json() - assert cancel_data["job_id"] == job_id - # Воркер еще не взял задачу, поэтому статус queued, но cancel_requested уже true - assert cancel_data["status"] == "queued" - - # 3. Проверка статуса после отмены - response = await client.get(f"/api/v1/jobs/{job_id}/status") - assert response.status_code == 200 - status_data = response.json() - assert status_data["job_id"] == job_id - # Задача еще не отменена, а только запрошена к отмене - assert status_data["status"] == "queued" - - -@pytest.mark.anyio -async def test_trigger_duplicate_idempotency_key(client: AsyncClient): - """ - Тест проверяет, что повторная отправка с тем же ключом идемпотентности - возвращает ту же самую задачу. - """ - idempotency_key = "idem_key_123" - trigger_payload = { - "queue": "idempotent_queue", - "task": "idempotent_task", - "lock_key": "lock_idem", - "idempotency_key": idempotency_key, - } - - # 1. Первый запрос - response1 = await client.post("/api/v1/jobs/trigger", json=trigger_payload) - assert response1.status_code == 200 - job_id1 = response1.json()["job_id"] - - # 2. Второй запрос с тем же ключом - response2 = await client.post("/api/v1/jobs/trigger", json=trigger_payload) - assert response2.status_code == 200 - job_id2 = response2.json()["job_id"] - - # ID задач должны совпадать - assert job_id1 == job_id2 \ No newline at end of file diff --git a/tests/integrations/test_worker_protocol.py b/tests/integrations/test_worker_protocol.py deleted file mode 100644 index 47f0981..0000000 --- a/tests/integrations/test_worker_protocol.py +++ /dev/null @@ -1,155 +0,0 @@ -# tests/integrations/test_worker_protocol.py -import asyncio -from datetime import datetime, timedelta, timezone -from uuid import uuid4 - -import pytest -from sqlalchemy.ext.asyncio import AsyncSession - -from dataloader.storage.repositories import QueueRepository, CreateJobRequest -from dataloader.context import APP_CTX - -@pytest.mark.anyio -async def test_e2e_worker_protocol_ok(db_session: AsyncSession): - """ - Проверяет полный E2E-сценарий жизненного цикла задачи: - 1. Постановка (create) - 2. Захват (claim) - 3. Пульс (heartbeat) - 4. Успешное завершение (finish_ok) - 5. Проверка статуса - """ - repo = QueueRepository(db_session) - job_id = str(uuid4()) - queue_name = "e2e_ok_queue" - lock_key = f"lock_{job_id}" - - # 1. Постановка задачи - create_req = CreateJobRequest( - job_id=job_id, - queue=queue_name, - task="test_e2e_task", - args={}, - idempotency_key=None, - lock_key=lock_key, - partition_key="", - priority=100, - available_at=datetime.now(timezone.utc), - max_attempts=3, - lease_ttl_sec=30, - producer=None, - consumer_group=None, - ) - await repo.create_or_get(create_req) - - # 2. Захват задачи - claimed_job = await repo.claim_one(queue_name, claim_backoff_sec=10) - assert claimed_job is not None - assert claimed_job["job_id"] == job_id - assert claimed_job["lock_key"] == lock_key - - # 3. Пульс - success, cancel_requested = await repo.heartbeat(job_id, ttl_sec=60) - assert success - assert not cancel_requested - - # 4. Успешное завершение - await repo.finish_ok(job_id) - - # 5. Проверка статуса - status = await repo.get_status(job_id) - assert status is not None - assert status.status == "succeeded" - assert status.finished_at is not None - - -@pytest.mark.anyio -async def test_concurrency_claim_one_locks(db_session: AsyncSession): - """ - Проверяет, что при конкурентном доступе к задачам с одинаковым - lock_key только один воркер может захватить задачу. - """ - repo = QueueRepository(db_session) - queue_name = "concurrency_queue" - lock_key = "concurrent_lock_123" - job_ids = [str(uuid4()), str(uuid4())] - - # 1. Создание двух задач с одинаковым lock_key - for i, job_id in enumerate(job_ids): - create_req = CreateJobRequest( - job_id=job_id, - queue=queue_name, - task=f"task_{i}", - args={}, - idempotency_key=f"idem_con_{i}", - lock_key=lock_key, - partition_key="", - priority=100 + i, - available_at=datetime.now(timezone.utc), - max_attempts=1, - lease_ttl_sec=30, - producer="test", - consumer_group="test_group", - ) - await repo.create_or_get(create_req) - - # 2. Первый воркер захватывает задачу - claimed_job_1 = await repo.claim_one(queue_name, claim_backoff_sec=1) - assert claimed_job_1 is not None - assert claimed_job_1["job_id"] == job_ids[0] - - # 3. Второй воркер пытается захватить задачу, но не может (из-за advisory lock) - claimed_job_2 = await repo.claim_one(queue_name, claim_backoff_sec=1) - assert claimed_job_2 is None - - # 4. Первый воркер освобождает advisory lock (как будто завершил работу) - await repo._advisory_unlock(lock_key) - - # 5. Второй воркер теперь может захватить вторую задачу - claimed_job_3 = await repo.claim_one(queue_name, claim_backoff_sec=1) - assert claimed_job_3 is not None - assert claimed_job_3["job_id"] == job_ids[1] - - -@pytest.mark.anyio -async def test_reaper_requeues_lost_jobs(db_session: AsyncSession): - """ - Проверяет, что reaper корректно возвращает "потерянные" задачи в очередь. - """ - repo = QueueRepository(db_session) - job_id = str(uuid4()) - queue_name = "reaper_queue" - - # 1. Создаем и захватываем задачу - create_req = CreateJobRequest( - job_id=job_id, - queue=queue_name, - task="reaper_test_task", - args={}, - idempotency_key="idem_reaper_1", - lock_key="reaper_lock_1", - partition_key="", - priority=100, - available_at=datetime.now(timezone.utc), - max_attempts=3, - lease_ttl_sec=1, # Очень короткий lease - producer=None, - consumer_group=None, - ) - await repo.create_or_get(create_req) - - claimed_job = await repo.claim_one(queue_name, claim_backoff_sec=1) - assert claimed_job is not None - assert claimed_job["job_id"] == job_id - - # 2. Ждем истечения lease - await asyncio.sleep(2) - - # 3. Запускаем reaper - requeued_ids = await repo.requeue_lost() - assert requeued_ids == [job_id] - - # 4. Проверяем статус - status = await repo.get_status(job_id) - assert status is not None - assert status.status == "queued"