feat: add router

This commit is contained in:
itqop 2025-11-05 01:44:22 +03:00
parent 33d8f5ab8b
commit 18cbbe00d3
3 changed files with 197 additions and 70 deletions

View File

@ -1,14 +1,68 @@
"""Агрегатор v1-роутов.
# src/dataloader/api/v1/router.py
from __future__ import annotations
Экспортирует готовый `router`, собранный из модульных роутеров в пакете `routes`.
Оставлен как тонкий слой для обратной совместимости импортов `from dataloader.api.v1 import router`.
from collections.abc import AsyncGenerator
from http import HTTPStatus
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException
from dataloader.api.v1.schemas import (
CancelJobResponse,
JobStatusResponse,
TriggerJobRequest,
TriggerJobResponse,
)
from dataloader.api.v1.service import JobsService
from dataloader.storage.db import session_scope
router = APIRouter(prefix="/api/v1/jobs", tags=["jobs"])
async def get_service() -> AsyncGenerator[JobsService, None]:
"""
from fastapi import APIRouter
Создаёт JobsService с новой сессией и корректно закрывает её после запроса.
"""
async for s in session_scope():
yield JobsService(s)
router = APIRouter()
@router.post("/trigger", response_model=TriggerJobResponse, status_code=HTTPStatus.OK)
async def trigger_job(
payload: TriggerJobRequest,
svc: Annotated[JobsService, Depends(get_service)],
) -> TriggerJobResponse:
"""
Создаёт или возвращает существующую задачу по idempotency_key.
"""
return await svc.trigger(payload)
@router.get("/{job_id}/status", response_model=JobStatusResponse, status_code=HTTPStatus.OK)
async def get_status(
job_id: UUID,
svc: Annotated[JobsService, Depends(get_service)],
) -> JobStatusResponse:
"""
Возвращает статус задачи по идентификатору.
"""
st = await svc.status(job_id)
if not st:
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="job not found")
return st
__all__ = ["router"]
@router.post("/{job_id}/cancel", response_model=CancelJobResponse, status_code=HTTPStatus.OK)
async def cancel_job(
job_id: UUID,
svc: Annotated[JobsService, Depends(get_service)],
) -> CancelJobResponse:
"""
Запрашивает отмену задачи.
"""
st = await svc.cancel(job_id)
if not st:
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="job not found")
return st

View File

@ -1,49 +1,70 @@
"""Pydantic схемы для API v1: запросы и ответы."""
# src/dataloader/api/v1/schemas.py
from __future__ import annotations
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from datetime import datetime
from uuid import UUID
from datetime import datetime, timezone
from typing import Any, Optional
from uuid import UUID, uuid4
from pydantic import BaseModel, Field, field_validator
class JobTriggerRequest(BaseModel):
"""Запрос на постановку задачи в очередь."""
queue: str = Field(..., description="Название очереди")
task: str = Field(..., description="Тип задачи")
args: Optional[Dict[str, Any]] = Field(default={}, description="Аргументы задачи")
idempotency_key: Optional[str] = Field(None, description="Ключ идемпотентности")
lock_key: str = Field(..., description="Ключ блокировки")
partition_key: Optional[str] = Field(default="", description="Ключ партиционирования")
priority: Optional[int] = Field(default=100, description="Приоритет задачи")
available_at: Optional[datetime] = Field(None, description="Время доступности задачи (RFC3339)")
class TriggerJobRequest(BaseModel):
"""
Запрос на постановку задачи в очередь.
"""
queue: str = Field(...)
task: str = Field(...)
args: dict[str, Any] = Field(default_factory=dict)
idempotency_key: Optional[str] = Field(default=None)
lock_key: str = Field(...)
partition_key: str = Field(default="")
priority: int = Field(default=100, ge=0)
available_at: Optional[datetime] = Field(default=None)
max_attempts: int = Field(default=5, ge=0)
lease_ttl_sec: int = Field(default=60, gt=0)
producer: Optional[str] = Field(default=None)
consumer_group: Optional[str] = Field(default=None)
@field_validator("available_at")
@classmethod
def _ensure_tz(cls, v: Optional[datetime]) -> Optional[datetime]:
if v is None:
return None
return v if v.tzinfo else v.replace(tzinfo=timezone.utc)
class JobTriggerResponse(BaseModel):
"""Ответ на постановку задачи."""
job_id: UUID = Field(..., description="Идентификатор задачи")
status: str = Field(..., description="Статус задачи")
class TriggerJobResponse(BaseModel):
"""
Ответ на постановку задачи.
"""
job_id: UUID = Field(...)
status: str = Field(...)
class JobStatusResponse(BaseModel):
"""Ответ со статусом задачи."""
job_id: UUID
status: str
attempt: int
started_at: Optional[datetime] = None
finished_at: Optional[datetime] = None
heartbeat_at: Optional[datetime] = None
error: Optional[str] = None
progress: Dict[str, Any] = Field(default_factory=dict)
"""
Текущий статус задачи.
"""
job_id: UUID = Field(...)
status: str = Field(...)
attempt: int = Field(...)
started_at: Optional[datetime] = Field(default=None)
finished_at: Optional[datetime] = Field(default=None)
heartbeat_at: Optional[datetime] = Field(default=None)
error: Optional[str] = Field(default=None)
progress: dict[str, Any] = Field(default_factory=dict)
class JobCancelResponse(BaseModel):
"""Ответ на отмену задачи."""
job_id: UUID
status: str
attempt: int
started_at: Optional[datetime] = None
finished_at: Optional[datetime] = None
heartbeat_at: Optional[datetime] = None
error: Optional[str] = None
progress: Dict[str, Any] = Field(default_factory=dict)
class CancelJobResponse(BaseModel):
"""
Ответ на запрос отмены задачи.
"""
job_id: UUID = Field(...)
status: str = Field(...)
def new_job_id() -> UUID:
"""
Возвращает новый UUID для идентификатора задачи.
"""
return uuid4()

View File

@ -1,31 +1,83 @@
"""Бизнес-логика для API v1."""
# src/dataloader/api/v1/service.py
from __future__ import annotations
from typing import Optional
from datetime import datetime, timezone
from typing import Any, Optional
from uuid import UUID
from datetime import datetime
from .schemas import JobTriggerRequest, JobStatusResponse, JobCancelResponse
from ...storage.repositories import JobRepository
from sqlalchemy.ext.asyncio import AsyncSession
from dataloader.api.v1.schemas import (
CancelJobResponse,
JobStatusResponse,
TriggerJobRequest,
TriggerJobResponse,
new_job_id,
)
from dataloader.storage.repositories import (
CreateJobRequest,
QueueRepository,
)
from dataloader.context import APP_CTX
class JobService:
"""Сервис для работы с задачами."""
class JobsService:
"""
Бизнес-логика работы с очередью задач.
"""
def __init__(self, session: AsyncSession):
self._s = session
self._repo = QueueRepository(self._s)
self._log = APP_CTX.get_logger()
def __init__(self, job_repo: JobRepository):
self.job_repo = job_repo
async def trigger(self, req: TriggerJobRequest) -> TriggerJobResponse:
"""
Идемпотентно ставит задачу в очередь и возвращает её идентификатор и статус.
"""
job_uuid: UUID = new_job_id()
dt = req.available_at or datetime.now(timezone.utc)
creq = CreateJobRequest(
job_id=str(job_uuid),
queue=req.queue,
task=req.task,
args=req.args or {},
idempotency_key=req.idempotency_key,
lock_key=req.lock_key,
partition_key=req.partition_key or "",
priority=int(req.priority),
available_at=dt,
max_attempts=int(req.max_attempts),
lease_ttl_sec=int(req.lease_ttl_sec),
producer=req.producer,
consumer_group=req.consumer_group,
)
job_id, status = await self._repo.create_or_get(creq)
return TriggerJobResponse(job_id=UUID(job_id), status=status)
async def trigger_job(self, request: JobTriggerRequest) -> JobTriggerResponse:
"""Постановка задачи в очередь."""
# TODO: реализовать идемпотентную постановку через репозиторий
raise NotImplementedError
async def get_job_status(self, job_id: UUID) -> Optional[JobStatusResponse]:
"""Получение статуса задачи."""
# TODO: реализовать через репозиторий
raise NotImplementedError
async def cancel_job(self, job_id: UUID) -> Optional[JobCancelResponse]:
"""Отмена задачи."""
# TODO: реализовать через репозиторий
raise NotImplementedError
async def status(self, job_id: UUID) -> Optional[JobStatusResponse]:
"""
Возвращает статус задачи.
"""
st = await self._repo.get_status(str(job_id))
if not st:
return None
return JobStatusResponse(
job_id=UUID(st.job_id),
status=st.status,
attempt=st.attempt,
started_at=st.started_at,
finished_at=st.finished_at,
heartbeat_at=st.heartbeat_at,
error=st.error,
progress=st.progress or {},
)
async def cancel(self, job_id: UUID) -> Optional[CancelJobResponse]:
"""
Запрашивает отмену задачи и возвращает её текущее состояние.
"""
await self._repo.cancel(str(job_id))
st = await self._repo.get_status(str(job_id))
if not st:
return None
return CancelJobResponse(job_id=UUID(st.job_id), status=st.status)