diff --git a/src/dataloader/api/v1/router.py b/src/dataloader/api/v1/router.py index d083059..4b48f10 100644 --- a/src/dataloader/api/v1/router.py +++ b/src/dataloader/api/v1/router.py @@ -1,14 +1,68 @@ -"""Агрегатор v1-роутов. +# src/dataloader/api/v1/router.py +from __future__ import annotations -Экспортирует готовый `router`, собранный из модульных роутеров в пакете `routes`. -Оставлен как тонкий слой для обратной совместимости импортов `from dataloader.api.v1 import router`. -""" +from collections.abc import AsyncGenerator +from http import HTTPStatus +from typing import Annotated +from uuid import UUID -from fastapi import APIRouter +from fastapi import APIRouter, Depends, HTTPException + +from dataloader.api.v1.schemas import ( + CancelJobResponse, + JobStatusResponse, + TriggerJobRequest, + TriggerJobResponse, +) +from dataloader.api.v1.service import JobsService +from dataloader.storage.db import session_scope -router = APIRouter() +router = APIRouter(prefix="/api/v1/jobs", tags=["jobs"]) +async def get_service() -> AsyncGenerator[JobsService, None]: + """ + Создаёт JobsService с новой сессией и корректно закрывает её после запроса. + """ + async for s in session_scope(): + yield JobsService(s) -__all__ = ["router"] + +@router.post("/trigger", response_model=TriggerJobResponse, status_code=HTTPStatus.OK) +async def trigger_job( + payload: TriggerJobRequest, + svc: Annotated[JobsService, Depends(get_service)], +) -> TriggerJobResponse: + """ + Создаёт или возвращает существующую задачу по idempotency_key. + """ + return await svc.trigger(payload) + + +@router.get("/{job_id}/status", response_model=JobStatusResponse, status_code=HTTPStatus.OK) +async def get_status( + job_id: UUID, + svc: Annotated[JobsService, Depends(get_service)], +) -> JobStatusResponse: + """ + Возвращает статус задачи по идентификатору. + """ + st = await svc.status(job_id) + if not st: + raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="job not found") + return st + + +@router.post("/{job_id}/cancel", response_model=CancelJobResponse, status_code=HTTPStatus.OK) +async def cancel_job( + job_id: UUID, + svc: Annotated[JobsService, Depends(get_service)], +) -> CancelJobResponse: + """ + Запрашивает отмену задачи. + """ + st = await svc.cancel(job_id) + if not st: + raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="job not found") + return st diff --git a/src/dataloader/api/v1/schemas.py b/src/dataloader/api/v1/schemas.py index 9dae8ff..0643eeb 100644 --- a/src/dataloader/api/v1/schemas.py +++ b/src/dataloader/api/v1/schemas.py @@ -1,49 +1,70 @@ -"""Pydantic схемы для API v1: запросы и ответы.""" +# src/dataloader/api/v1/schemas.py +from __future__ import annotations -from pydantic import BaseModel, Field -from typing import Optional, Dict, Any -from datetime import datetime -from uuid import UUID +from datetime import datetime, timezone +from typing import Any, Optional +from uuid import UUID, uuid4 + +from pydantic import BaseModel, Field, field_validator -class JobTriggerRequest(BaseModel): - """Запрос на постановку задачи в очередь.""" - queue: str = Field(..., description="Название очереди") - task: str = Field(..., description="Тип задачи") - args: Optional[Dict[str, Any]] = Field(default={}, description="Аргументы задачи") - idempotency_key: Optional[str] = Field(None, description="Ключ идемпотентности") - lock_key: str = Field(..., description="Ключ блокировки") - partition_key: Optional[str] = Field(default="", description="Ключ партиционирования") - priority: Optional[int] = Field(default=100, description="Приоритет задачи") - available_at: Optional[datetime] = Field(None, description="Время доступности задачи (RFC3339)") +class TriggerJobRequest(BaseModel): + """ + Запрос на постановку задачи в очередь. + """ + queue: str = Field(...) + task: str = Field(...) + args: dict[str, Any] = Field(default_factory=dict) + idempotency_key: Optional[str] = Field(default=None) + lock_key: str = Field(...) + partition_key: str = Field(default="") + priority: int = Field(default=100, ge=0) + available_at: Optional[datetime] = Field(default=None) + max_attempts: int = Field(default=5, ge=0) + lease_ttl_sec: int = Field(default=60, gt=0) + producer: Optional[str] = Field(default=None) + consumer_group: Optional[str] = Field(default=None) + + @field_validator("available_at") + @classmethod + def _ensure_tz(cls, v: Optional[datetime]) -> Optional[datetime]: + if v is None: + return None + return v if v.tzinfo else v.replace(tzinfo=timezone.utc) -class JobTriggerResponse(BaseModel): - """Ответ на постановку задачи.""" - job_id: UUID = Field(..., description="Идентификатор задачи") - status: str = Field(..., description="Статус задачи") +class TriggerJobResponse(BaseModel): + """ + Ответ на постановку задачи. + """ + job_id: UUID = Field(...) + status: str = Field(...) class JobStatusResponse(BaseModel): - """Ответ со статусом задачи.""" - job_id: UUID - status: str - attempt: int - started_at: Optional[datetime] = None - finished_at: Optional[datetime] = None - heartbeat_at: Optional[datetime] = None - error: Optional[str] = None - progress: Dict[str, Any] = Field(default_factory=dict) + """ + Текущий статус задачи. + """ + job_id: UUID = Field(...) + status: str = Field(...) + attempt: int = Field(...) + started_at: Optional[datetime] = Field(default=None) + finished_at: Optional[datetime] = Field(default=None) + heartbeat_at: Optional[datetime] = Field(default=None) + error: Optional[str] = Field(default=None) + progress: dict[str, Any] = Field(default_factory=dict) -class JobCancelResponse(BaseModel): - """Ответ на отмену задачи.""" - job_id: UUID - status: str - attempt: int - started_at: Optional[datetime] = None - finished_at: Optional[datetime] = None - heartbeat_at: Optional[datetime] = None - error: Optional[str] = None - progress: Dict[str, Any] = Field(default_factory=dict) +class CancelJobResponse(BaseModel): + """ + Ответ на запрос отмены задачи. + """ + job_id: UUID = Field(...) + status: str = Field(...) + +def new_job_id() -> UUID: + """ + Возвращает новый UUID для идентификатора задачи. + """ + return uuid4() diff --git a/src/dataloader/api/v1/service.py b/src/dataloader/api/v1/service.py index a0d5f0e..dd75782 100644 --- a/src/dataloader/api/v1/service.py +++ b/src/dataloader/api/v1/service.py @@ -1,31 +1,83 @@ -"""Бизнес-логика для API v1.""" +# src/dataloader/api/v1/service.py +from __future__ import annotations -from typing import Optional +from datetime import datetime, timezone +from typing import Any, Optional from uuid import UUID -from datetime import datetime -from .schemas import JobTriggerRequest, JobStatusResponse, JobCancelResponse -from ...storage.repositories import JobRepository +from sqlalchemy.ext.asyncio import AsyncSession + +from dataloader.api.v1.schemas import ( + CancelJobResponse, + JobStatusResponse, + TriggerJobRequest, + TriggerJobResponse, + new_job_id, +) +from dataloader.storage.repositories import ( + CreateJobRequest, + QueueRepository, +) +from dataloader.context import APP_CTX -class JobService: - """Сервис для работы с задачами.""" - - def __init__(self, job_repo: JobRepository): - self.job_repo = job_repo - - async def trigger_job(self, request: JobTriggerRequest) -> JobTriggerResponse: - """Постановка задачи в очередь.""" - # TODO: реализовать идемпотентную постановку через репозиторий - raise NotImplementedError - - async def get_job_status(self, job_id: UUID) -> Optional[JobStatusResponse]: - """Получение статуса задачи.""" - # TODO: реализовать через репозиторий - raise NotImplementedError - - async def cancel_job(self, job_id: UUID) -> Optional[JobCancelResponse]: - """Отмена задачи.""" - # TODO: реализовать через репозиторий - raise NotImplementedError +class JobsService: + """ + Бизнес-логика работы с очередью задач. + """ + def __init__(self, session: AsyncSession): + self._s = session + self._repo = QueueRepository(self._s) + self._log = APP_CTX.get_logger() + async def trigger(self, req: TriggerJobRequest) -> TriggerJobResponse: + """ + Идемпотентно ставит задачу в очередь и возвращает её идентификатор и статус. + """ + job_uuid: UUID = new_job_id() + dt = req.available_at or datetime.now(timezone.utc) + creq = CreateJobRequest( + job_id=str(job_uuid), + queue=req.queue, + task=req.task, + args=req.args or {}, + idempotency_key=req.idempotency_key, + lock_key=req.lock_key, + partition_key=req.partition_key or "", + priority=int(req.priority), + available_at=dt, + max_attempts=int(req.max_attempts), + lease_ttl_sec=int(req.lease_ttl_sec), + producer=req.producer, + consumer_group=req.consumer_group, + ) + job_id, status = await self._repo.create_or_get(creq) + return TriggerJobResponse(job_id=UUID(job_id), status=status) + + async def status(self, job_id: UUID) -> Optional[JobStatusResponse]: + """ + Возвращает статус задачи. + """ + st = await self._repo.get_status(str(job_id)) + if not st: + return None + return JobStatusResponse( + job_id=UUID(st.job_id), + status=st.status, + attempt=st.attempt, + started_at=st.started_at, + finished_at=st.finished_at, + heartbeat_at=st.heartbeat_at, + error=st.error, + progress=st.progress or {}, + ) + + async def cancel(self, job_id: UUID) -> Optional[CancelJobResponse]: + """ + Запрашивает отмену задачи и возвращает её текущее состояние. + """ + await self._repo.cancel(str(job_id)) + st = await self._repo.get_status(str(job_id)) + if not st: + return None + return CancelJobResponse(job_id=UUID(st.job_id), status=st.status)