refactor: claude v2

This commit is contained in:
itqop 2025-11-05 14:41:56 +03:00
parent 8facab266d
commit 8bc14e488a
17 changed files with 782 additions and 400 deletions

3
.coveragerc Normal file
View File

@ -0,0 +1,3 @@
[run]
omit =
src/dataloader/logger/*

14
pytest.ini Normal file
View File

@ -0,0 +1,14 @@
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
asyncio_mode = auto
asyncio_default_fixture_loop_scope = function
markers =
integration: integration tests requiring database
unit: unit tests without external dependencies
addopts =
-v
--tb=short
--strict-markers

View File

@ -1,21 +1,18 @@
from pydantic import BaseModel, Field
from pydantic import BaseModel, ConfigDict, Field
class HealthResponse(BaseModel):
"""Ответ для ручки /health"""
status: str = Field(default="running", description="Service health check", max_length=7)
class Config:
model_config = ConfigDict(
json_schema_extra={"example": {"status": "running"}}
)
status: str = Field(default="running", description="Service health check", max_length=7)
class InfoResponse(BaseModel):
"""Ответ для ручки /info"""
name: str = Field(description="Service name", max_length=50)
description: str = Field(description="Service description", max_length=200)
type: str = Field(default="REST API", description="Service type", max_length=20)
version: str = Field(description="Service version", max_length=20, pattern=r"^\d+\.\d+\.\d+")
class Config:
model_config = ConfigDict(
json_schema_extra={
"example": {
"name": "rest-template",
@ -24,7 +21,16 @@ class InfoResponse(BaseModel):
"version": "0.1.0"
}
}
)
name: str = Field(description="Service name", max_length=50)
description: str = Field(description="Service description", max_length=200)
type: str = Field(default="REST API", description="Service type", max_length=20)
version: str = Field(description="Service version", max_length=20, pattern=r"^\d+\.\d+\.\d+")
class RateResponse(BaseModel):
"""Ответ для записи рейтинга"""
model_config = ConfigDict(str_strip_whitespace=True)
rating_result: str = Field(description="Rating that was recorded", max_length=50)

View File

@ -5,13 +5,15 @@ from datetime import datetime, timezone
from typing import Any, Optional
from uuid import UUID
from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator
class TriggerJobRequest(BaseModel):
"""
Запрос на постановку задачи в очередь.
"""
model_config = ConfigDict(str_strip_whitespace=True)
queue: str = Field(...)
task: str = Field(...)
args: dict[str, Any] = Field(default_factory=dict)
@ -37,6 +39,8 @@ class TriggerJobResponse(BaseModel):
"""
Ответ на постановку задачи.
"""
model_config = ConfigDict(str_strip_whitespace=True)
job_id: UUID = Field(...)
status: str = Field(...)
@ -45,6 +49,8 @@ class JobStatusResponse(BaseModel):
"""
Текущий статус задачи.
"""
model_config = ConfigDict(str_strip_whitespace=True)
job_id: UUID = Field(...)
status: str = Field(...)
attempt: int = Field(...)
@ -59,5 +65,7 @@ class CancelJobResponse(BaseModel):
"""
Ответ на запрос отмены задачи.
"""
model_config = ConfigDict(str_strip_whitespace=True)
job_id: UUID = Field(...)
status: str = Field(...)

View File

@ -0,0 +1,16 @@
# src/dataloader/storage/models/__init__.py
"""
ORM модели для работы с базой данных.
Организованы по доменам для масштабируемости.
"""
from __future__ import annotations
from .base import Base
from .queue import DLJob, DLJobEvent, dl_status_enum
__all__ = [
"Base",
"DLJob",
"DLJobEvent",
"dl_status_enum",
]

View File

@ -0,0 +1,12 @@
# src/dataloader/storage/models/base.py
from __future__ import annotations
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
"""
Базовый класс для всех ORM моделей приложения.
Используется SQLAlchemy 2.0+ declarative style.
"""
pass

View File

@ -1,4 +1,4 @@
# src/dataloader/storage/models.py
# src/dataloader/storage/models/queue.py
from __future__ import annotations
from datetime import datetime
@ -6,14 +6,9 @@ from typing import Any, Optional
from sqlalchemy import BigInteger, DateTime, Text
from sqlalchemy.dialects.postgresql import ENUM, JSONB, UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.orm import Mapped, mapped_column
class Base(DeclarativeBase):
"""
Базовый класс для всех ORM моделей.
"""
pass
from .base import Base
dl_status_enum = ENUM(

View File

@ -0,0 +1,12 @@
# src/dataloader/storage/repositories/__init__.py
"""
Репозитории для работы с базой данных.
Организованы по доменам для масштабируемости.
"""
from __future__ import annotations
from .queue import QueueRepository
__all__ = [
"QueueRepository",
]

View File

@ -1,4 +1,4 @@
# src/dataloader/storage/repositories.py
# src/dataloader/storage/repositories/queue.py
from __future__ import annotations
from datetime import datetime, timedelta, timezone

View File

@ -0,0 +1,13 @@
# src/dataloader/storage/schemas/__init__.py
"""
DTO (Data Transfer Objects) для слоя хранилища.
Организованы по доменам для масштабируемости.
"""
from __future__ import annotations
from .queue import CreateJobRequest, JobStatus
__all__ = [
"CreateJobRequest",
"JobStatus",
]

View File

@ -1,4 +1,4 @@
# src/dataloader/storage/schemas.py
# src/dataloader/storage/schemas/queue.py
from __future__ import annotations
from dataclasses import dataclass

View File

@ -1,179 +1,104 @@
# tests/conftest.py
from __future__ import annotations
import asyncio
import sys
from typing import AsyncGenerator
from uuid import uuid4
import pytest
import pytest_asyncio
from dotenv import load_dotenv
from httpx import AsyncClient, ASGITransport
from sqlalchemy import text
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.exc import ProgrammingError
from asyncpg.exceptions import InvalidCatalogNameError
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
# Load .env before other imports to ensure config is available
load_dotenv()
from dataloader.api import app_main
from dataloader.api.v1.router import get_service
from dataloader.api.v1.service import JobsService
from dataloader.context import APP_CTX
from dataloader.storage.db import Base
from dataloader.config import APP_CONFIG
from dataloader.context import APP_CTX, get_session
from dataloader.storage.models import Base
from dataloader.storage.engine import create_engine, create_sessionmaker
# For Windows: use SelectorEventLoop which is more stable
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
@pytest.fixture(scope="session")
@pytest_asyncio.fixture(scope="function")
async def db_engine() -> AsyncGenerator[AsyncEngine, None]:
"""
Creates a temporary, isolated test database for the entire test session.
- Connects to the default 'postgres' database to create a new test DB.
- Yields an engine connected to the new test DB.
- Drops the test DB after the session is complete.
Создаёт тестовый движок для теста.
Использует реальную БД из конфига.
"""
pg_settings = APP_CONFIG.pg
test_db_name = f"{pg_settings.database}_test"
# DSN for connecting to 'postgres' DB to manage the test DB
system_dsn = pg_settings.url.replace(f"/{pg_settings.database}", "/postgres")
system_engine = create_async_engine(system_dsn, isolation_level="AUTOCOMMIT")
# DSN for the new test database
test_dsn = pg_settings.url.replace(f"/{pg_settings.database}", f"/{test_db_name}")
async with system_engine.connect() as conn:
# Drop the test DB if it exists from a previous failed run
await conn.execute(text(f"DROP DATABASE IF EXISTS {test_db_name} WITH (FORCE)"))
await conn.execute(text(f"CREATE DATABASE {test_db_name}"))
# Create an engine connected to the new test database
engine = create_async_engine(test_dsn)
# Create all tables and DDL objects
async with engine.begin() as conn:
# Execute DDL statements one by one
await conn.execute(text("CREATE TYPE dl_status AS ENUM ('queued','running','succeeded','failed','canceled','lost')"))
# Create tables
await conn.run_sync(Base.metadata.create_all)
# Create indexes
await conn.execute(text("CREATE INDEX ix_dl_jobs_claim ON dl_jobs(queue, available_at, priority, created_at) WHERE status = 'queued'"))
await conn.execute(text("CREATE INDEX ix_dl_jobs_running_lease ON dl_jobs(lease_expires_at) WHERE status = 'running'"))
await conn.execute(text("CREATE INDEX ix_dl_jobs_status_queue ON dl_jobs(status, queue)"))
# Create function and triggers
await conn.execute(
text(
"""
CREATE OR REPLACE FUNCTION notify_job_ready() RETURNS trigger AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
PERFORM pg_notify('dl_jobs', NEW.queue);
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
IF NEW.status = 'queued' AND NEW.available_at <= now()
AND (OLD.status IS DISTINCT FROM NEW.status OR OLD.available_at IS DISTINCT FROM NEW.available_at) THEN
PERFORM pg_notify('dl_jobs', NEW.queue);
END IF;
RETURN NEW;
END IF;
RETURN NEW;
END $$ LANGUAGE plpgsql;
"""
)
)
await conn.execute(text("CREATE TRIGGER dl_jobs_notify_ins AFTER INSERT ON dl_jobs FOR EACH ROW EXECUTE FUNCTION notify_job_ready()"))
await conn.execute(text("CREATE TRIGGER dl_jobs_notify_upd AFTER UPDATE OF status, available_at ON dl_jobs FOR EACH ROW EXECUTE FUNCTION notify_job_ready()"))
engine = create_engine(APP_CONFIG.pg.url)
yield engine
# --- Teardown ---
await engine.dispose()
# Drop the test database
async with system_engine.connect() as conn:
await conn.execute(text(f"DROP DATABASE {test_db_name} WITH (FORCE)"))
await system_engine.dispose()
@pytest.fixture(scope="session")
async def app_context(db_engine: AsyncEngine) -> AsyncGenerator[None, None]:
"""
Overrides the global APP_CTX with the test database engine and sessionmaker.
This ensures that the app, when tested, uses the isolated test DB.
"""
original_engine = APP_CTX._engine
original_sessionmaker = APP_CTX._sessionmaker
APP_CTX._engine = db_engine
APP_CTX._sessionmaker = async_sessionmaker(db_engine, expire_on_commit=False, class_=AsyncSession)
yield
# Restore original context
APP_CTX._engine = original_engine
APP_CTX._sessionmaker = original_sessionmaker
@pytest.fixture
@pytest_asyncio.fixture(scope="function")
async def db_session(db_engine: AsyncEngine) -> AsyncGenerator[AsyncSession, None]:
"""
Provides a transactional session for tests.
A single connection is acquired from the engine, a transaction is started,
and a new session is created bound to that connection. At the end of the test,
the transaction is rolled back and the connection is closed, ensuring
perfect test isolation.
Предоставляет сессию БД для каждого теста.
НЕ использует транзакцию, чтобы работали advisory locks.
"""
connection = await db_engine.connect()
trans = await connection.begin()
# Create a sessionmaker bound to the single connection
TestSession = async_sessionmaker(
bind=connection,
expire_on_commit=False,
class_=AsyncSession
)
session = TestSession()
try:
sessionmaker = async_sessionmaker(bind=db_engine, expire_on_commit=False, class_=AsyncSession)
async with sessionmaker() as session:
yield session
finally:
# Clean up the session, rollback the transaction, and close the connection
await session.close()
await trans.rollback()
await connection.close()
await session.rollback()
@pytest.fixture
@pytest_asyncio.fixture(scope="function")
async def clean_queue_tables(db_session: AsyncSession) -> None:
"""
Очищает таблицы очереди перед каждым тестом.
"""
schema = APP_CONFIG.pg.schema_queue
await db_session.execute(text(f"TRUNCATE TABLE {schema}.dl_job_events CASCADE"))
await db_session.execute(text(f"TRUNCATE TABLE {schema}.dl_jobs CASCADE"))
await db_session.commit()
@pytest_asyncio.fixture
async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]:
"""
Provides an HTTP client for the FastAPI application.
It overrides the 'get_session' dependency to use the test-scoped,
transactional session provided by the 'db_session' fixture. This ensures
that API calls operate within the same transaction as the test function,
allowing for consistent state checks.
HTTP клиент для тестирования API.
"""
async def override_get_session() -> AsyncGenerator[AsyncSession, None]:
"""This override simply yields the session created by the db_session fixture."""
yield db_session
# The service depends on the session, so we override the session provider
# that is used by the service via Depends(get_session)
from dataloader.context import get_session
app_main.dependency_overrides[get_session] = override_get_session
transport = ASGITransport(app=app_main)
async with AsyncClient(transport=transport, base_url="http://test") as c:
yield c
# Clean up the dependency override
app_main.dependency_overrides.clear()
@pytest.fixture
def job_id() -> str:
"""
Генерирует уникальный job_id для тестов.
"""
return str(uuid4())
@pytest.fixture
def queue_name() -> str:
"""
Возвращает имя тестовой очереди.
"""
return "test.queue"
@pytest.fixture
def task_name() -> str:
"""
Возвращает имя тестовой задачи.
"""
return "test.task"

View File

@ -0,0 +1 @@
# tests/integration_tests/__init__.py

View File

@ -0,0 +1,172 @@
# tests/integration_tests/test_api_endpoints.py
from __future__ import annotations
import pytest
from httpx import AsyncClient
@pytest.mark.integration
class TestJobsAPI:
"""
Интеграционные тесты для API endpoints.
"""
async def test_trigger_job_creates_new_job(
self,
client: AsyncClient,
clean_queue_tables,
queue_name: str,
task_name: str,
):
"""
Тест создания новой задачи через API.
"""
payload = {
"queue": queue_name,
"task": task_name,
"args": {"test_key": "test_value"},
"lock_key": "lock_api_1",
"partition_key": "part1",
"priority": 100,
"max_attempts": 5,
"lease_ttl_sec": 60,
}
response = await client.post("/api/v1/jobs/trigger", json=payload)
assert response.status_code == 200
data = response.json()
assert "job_id" in data
assert data["status"] == "queued"
async def test_trigger_job_with_idempotency_key(
self,
client: AsyncClient,
clean_queue_tables,
queue_name: str,
task_name: str,
):
"""
Тест идемпотентности через idempotency_key.
"""
payload = {
"queue": queue_name,
"task": task_name,
"args": {},
"idempotency_key": "unique_key_123",
"lock_key": "lock_idem",
"priority": 100,
"max_attempts": 5,
"lease_ttl_sec": 60,
}
response1 = await client.post("/api/v1/jobs/trigger", json=payload)
response2 = await client.post("/api/v1/jobs/trigger", json=payload)
assert response1.status_code == 200
assert response2.status_code == 200
data1 = response1.json()
data2 = response2.json()
assert data1["job_id"] == data2["job_id"]
assert data1["status"] == data2["status"] == "queued"
async def test_get_status_returns_job_status(
self,
client: AsyncClient,
clean_queue_tables,
queue_name: str,
task_name: str,
):
"""
Тест получения статуса задачи через API.
"""
payload = {
"queue": queue_name,
"task": task_name,
"args": {},
"lock_key": "lock_status",
"priority": 100,
"max_attempts": 5,
"lease_ttl_sec": 60,
}
create_response = await client.post("/api/v1/jobs/trigger", json=payload)
job_id = create_response.json()["job_id"]
status_response = await client.get(f"/api/v1/jobs/{job_id}/status")
assert status_response.status_code == 200
data = status_response.json()
assert data["job_id"] == job_id
assert data["status"] == "queued"
assert data["attempt"] == 0
async def test_get_status_returns_404_for_nonexistent_job(
self,
client: AsyncClient,
clean_queue_tables,
):
"""
Тест получения статуса несуществующей задачи.
"""
fake_job_id = "00000000-0000-0000-0000-000000000000"
response = await client.get(f"/api/v1/jobs/{fake_job_id}/status")
assert response.status_code == 404
async def test_cancel_job_sets_cancel_flag(
self,
client: AsyncClient,
clean_queue_tables,
queue_name: str,
task_name: str,
):
"""
Тест отмены задачи через API.
"""
payload = {
"queue": queue_name,
"task": task_name,
"args": {},
"lock_key": "lock_cancel",
"priority": 100,
"max_attempts": 5,
"lease_ttl_sec": 60,
}
create_response = await client.post("/api/v1/jobs/trigger", json=payload)
job_id = create_response.json()["job_id"]
cancel_response = await client.post(f"/api/v1/jobs/{job_id}/cancel")
assert cancel_response.status_code == 200
data = cancel_response.json()
assert data["job_id"] == job_id
async def test_cancel_nonexistent_job_returns_404(
self,
client: AsyncClient,
clean_queue_tables,
):
"""
Тест отмены несуществующей задачи.
"""
fake_job_id = "00000000-0000-0000-0000-000000000000"
response = await client.post(f"/api/v1/jobs/{fake_job_id}/cancel")
assert response.status_code == 404
async def test_health_endpoint_returns_200(
self,
client: AsyncClient,
):
"""
Тест health check endpoint.
"""
response = await client.get("/health")
assert response.status_code == 200

View File

@ -0,0 +1,453 @@
# tests/integration_tests/test_queue_repository.py
from __future__ import annotations
from datetime import datetime, timezone
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from dataloader.storage.repositories import QueueRepository
from dataloader.storage.schemas import CreateJobRequest, JobStatus
@pytest.mark.integration
class TestQueueRepository:
"""
Интеграционные тесты для QueueRepository.
"""
async def test_create_or_get_creates_new_job(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест создания новой задачи в очереди.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={"param": "value"},
idempotency_key="test_key_1",
lock_key="test_lock_1",
partition_key="part1",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer="test_producer",
consumer_group="test_group",
)
created_id, status = await repo.create_or_get(req)
assert created_id == job_id
assert status == "queued"
async def test_create_or_get_returns_existing_job(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест идемпотентности - повторный вызов возвращает существующую задачу.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key="idempotent_key_1",
lock_key="lock1",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
created_id_1, status_1 = await repo.create_or_get(req)
req_2 = CreateJobRequest(
job_id="different_job_id",
queue="different_queue",
task="different_task",
args={},
idempotency_key="idempotent_key_1",
lock_key="lock2",
partition_key="",
priority=200,
available_at=datetime.now(timezone.utc),
max_attempts=3,
lease_ttl_sec=30,
producer=None,
consumer_group=None,
)
created_id_2, status_2 = await repo.create_or_get(req_2)
assert created_id_1 == created_id_2 == job_id
assert status_1 == status_2 == "queued"
async def test_get_status_returns_job_status(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест получения статуса задачи.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={"key": "val"},
idempotency_key=None,
lock_key="lock",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
status = await repo.get_status(job_id)
assert status is not None
assert status.job_id == job_id
assert status.status == "queued"
assert status.attempt == 0
assert status.error is None
async def test_get_status_returns_none_for_nonexistent_job(
self,
db_session: AsyncSession,
clean_queue_tables,
):
"""
Тест получения статуса несуществующей задачи.
"""
repo = QueueRepository(db_session)
status = await repo.get_status("00000000-0000-0000-0000-000000000000")
assert status is None
async def test_cancel_sets_cancel_requested_flag(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест установки флага отмены.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
result = await repo.cancel(job_id)
assert result is True
status = await repo.get_status(job_id)
assert status is not None
async def test_claim_one_returns_job_for_processing(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест захвата задачи для обработки.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={"data": "test"},
idempotency_key=None,
lock_key="lock_claim",
partition_key="partition1",
priority=50,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=120,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
claimed = await repo.claim_one(queue_name, claim_backoff_sec=15)
assert claimed is not None
assert claimed["job_id"] == job_id
assert claimed["queue"] == queue_name
assert claimed["task"] == task_name
assert claimed["args"] == {"data": "test"}
assert claimed["lock_key"] == "lock_claim"
assert claimed["attempt"] == 1
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "running"
assert status.attempt == 1
async def test_claim_one_returns_none_when_no_jobs(
self,
db_session: AsyncSession,
clean_queue_tables,
queue_name: str,
):
"""
Тест захвата при пустой очереди.
"""
repo = QueueRepository(db_session)
claimed = await repo.claim_one(queue_name, claim_backoff_sec=15)
assert claimed is None
async def test_heartbeat_updates_lease(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест обновления heartbeat и продления lease.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_hb",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
success, cancel_requested = await repo.heartbeat(job_id, ttl_sec=90)
assert success is True
assert cancel_requested is False
async def test_finish_ok_marks_job_succeeded(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест успешного завершения задачи.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_finish",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
await repo.finish_ok(job_id)
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "succeeded"
assert status.finished_at is not None
async def test_finish_fail_or_retry_requeues_on_retry(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест повторной постановки при ошибке с возможностью retry.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_retry",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=3,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
await repo.finish_fail_or_retry(job_id, err="Test error")
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "queued"
assert status.error == "Test error"
assert status.attempt == 1
async def test_finish_fail_or_retry_marks_failed_when_max_attempts_reached(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест финальной ошибки при достижении max_attempts.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_fail",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=1,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
await repo.finish_fail_or_retry(job_id, err="Final error")
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "failed"
assert status.error == "Final error"
assert status.finished_at is not None
async def test_requeue_lost_returns_expired_jobs(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест reaper - возврат протухших задач в очередь.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_lost",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=1,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
import asyncio
await asyncio.sleep(2)
requeued = await repo.requeue_lost()
assert job_id in requeued
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "queued"

View File

@ -1,93 +0,0 @@
# tests/integrations/test_api_v1.py
import pytest
from httpx import AsyncClient
@pytest.mark.anyio
async def test_trigger_and_get_status_ok(client: AsyncClient):
"""
Тест проверяет успешное создание задачи и получение её статуса.
"""
# 1. Триггер задачи
trigger_payload = {
"queue": "test_queue",
"task": "test_task",
"lock_key": "lock_123",
}
response = await client.post("/api/v1/jobs/trigger", json=trigger_payload)
# Проверки ответа на триггер
assert response.status_code == 200
response_data = response.json()
assert "job_id" in response_data
assert response_data["status"] == "queued"
job_id = response_data["job_id"]
# 2. Получение статуса
response = await client.get(f"/api/v1/jobs/{job_id}/status")
# Проверки ответа на статус
assert response.status_code == 200
status_data = response.json()
assert status_data["job_id"] == job_id
assert status_data["status"] == "queued"
assert status_data["attempt"] == 0
@pytest.mark.anyio
async def test_cancel_job_ok(client: AsyncClient):
"""
Тест проверяет успешную отмену задачи.
"""
# 1. Триггер задачи
trigger_payload = {
"queue": "cancel_queue",
"task": "cancel_task",
"lock_key": "lock_cancel",
}
response = await client.post("/api/v1/jobs/trigger", json=trigger_payload)
assert response.status_code == 200
job_id = response.json()["job_id"]
# 2. Запрос на отмену
response = await client.post(f"/api/v1/jobs/{job_id}/cancel")
assert response.status_code == 200
cancel_data = response.json()
assert cancel_data["job_id"] == job_id
# Воркер еще не взял задачу, поэтому статус queued, но cancel_requested уже true
assert cancel_data["status"] == "queued"
# 3. Проверка статуса после отмены
response = await client.get(f"/api/v1/jobs/{job_id}/status")
assert response.status_code == 200
status_data = response.json()
assert status_data["job_id"] == job_id
# Задача еще не отменена, а только запрошена к отмене
assert status_data["status"] == "queued"
@pytest.mark.anyio
async def test_trigger_duplicate_idempotency_key(client: AsyncClient):
"""
Тест проверяет, что повторная отправка с тем же ключом идемпотентности
возвращает ту же самую задачу.
"""
idempotency_key = "idem_key_123"
trigger_payload = {
"queue": "idempotent_queue",
"task": "idempotent_task",
"lock_key": "lock_idem",
"idempotency_key": idempotency_key,
}
# 1. Первый запрос
response1 = await client.post("/api/v1/jobs/trigger", json=trigger_payload)
assert response1.status_code == 200
job_id1 = response1.json()["job_id"]
# 2. Второй запрос с тем же ключом
response2 = await client.post("/api/v1/jobs/trigger", json=trigger_payload)
assert response2.status_code == 200
job_id2 = response2.json()["job_id"]
# ID задач должны совпадать
assert job_id1 == job_id2

View File

@ -1,155 +0,0 @@
# tests/integrations/test_worker_protocol.py
import asyncio
from datetime import datetime, timedelta, timezone
from uuid import uuid4
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from dataloader.storage.repositories import QueueRepository, CreateJobRequest
from dataloader.context import APP_CTX
@pytest.mark.anyio
async def test_e2e_worker_protocol_ok(db_session: AsyncSession):
"""
Проверяет полный E2E-сценарий жизненного цикла задачи:
1. Постановка (create)
2. Захват (claim)
3. Пульс (heartbeat)
4. Успешное завершение (finish_ok)
5. Проверка статуса
"""
repo = QueueRepository(db_session)
job_id = str(uuid4())
queue_name = "e2e_ok_queue"
lock_key = f"lock_{job_id}"
# 1. Постановка задачи
create_req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task="test_e2e_task",
args={},
idempotency_key=None,
lock_key=lock_key,
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=3,
lease_ttl_sec=30,
producer=None,
consumer_group=None,
)
await repo.create_or_get(create_req)
# 2. Захват задачи
claimed_job = await repo.claim_one(queue_name, claim_backoff_sec=10)
assert claimed_job is not None
assert claimed_job["job_id"] == job_id
assert claimed_job["lock_key"] == lock_key
# 3. Пульс
success, cancel_requested = await repo.heartbeat(job_id, ttl_sec=60)
assert success
assert not cancel_requested
# 4. Успешное завершение
await repo.finish_ok(job_id)
# 5. Проверка статуса
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "succeeded"
assert status.finished_at is not None
@pytest.mark.anyio
async def test_concurrency_claim_one_locks(db_session: AsyncSession):
"""
Проверяет, что при конкурентном доступе к задачам с одинаковым
lock_key только один воркер может захватить задачу.
"""
repo = QueueRepository(db_session)
queue_name = "concurrency_queue"
lock_key = "concurrent_lock_123"
job_ids = [str(uuid4()), str(uuid4())]
# 1. Создание двух задач с одинаковым lock_key
for i, job_id in enumerate(job_ids):
create_req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=f"task_{i}",
args={},
idempotency_key=f"idem_con_{i}",
lock_key=lock_key,
partition_key="",
priority=100 + i,
available_at=datetime.now(timezone.utc),
max_attempts=1,
lease_ttl_sec=30,
producer="test",
consumer_group="test_group",
)
await repo.create_or_get(create_req)
# 2. Первый воркер захватывает задачу
claimed_job_1 = await repo.claim_one(queue_name, claim_backoff_sec=1)
assert claimed_job_1 is not None
assert claimed_job_1["job_id"] == job_ids[0]
# 3. Второй воркер пытается захватить задачу, но не может (из-за advisory lock)
claimed_job_2 = await repo.claim_one(queue_name, claim_backoff_sec=1)
assert claimed_job_2 is None
# 4. Первый воркер освобождает advisory lock (как будто завершил работу)
await repo._advisory_unlock(lock_key)
# 5. Второй воркер теперь может захватить вторую задачу
claimed_job_3 = await repo.claim_one(queue_name, claim_backoff_sec=1)
assert claimed_job_3 is not None
assert claimed_job_3["job_id"] == job_ids[1]
@pytest.mark.anyio
async def test_reaper_requeues_lost_jobs(db_session: AsyncSession):
"""
Проверяет, что reaper корректно возвращает "потерянные" задачи в очередь.
"""
repo = QueueRepository(db_session)
job_id = str(uuid4())
queue_name = "reaper_queue"
# 1. Создаем и захватываем задачу
create_req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task="reaper_test_task",
args={},
idempotency_key="idem_reaper_1",
lock_key="reaper_lock_1",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=3,
lease_ttl_sec=1, # Очень короткий lease
producer=None,
consumer_group=None,
)
await repo.create_or_get(create_req)
claimed_job = await repo.claim_one(queue_name, claim_backoff_sec=1)
assert claimed_job is not None
assert claimed_job["job_id"] == job_id
# 2. Ждем истечения lease
await asyncio.sleep(2)
# 3. Запускаем reaper
requeued_ids = await repo.requeue_lost()
assert requeued_ids == [job_id]
# 4. Проверяем статус
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "queued"