tests: add new tests

This commit is contained in:
itqop 2025-11-05 19:01:33 +03:00
parent 6364c97f21
commit e0829d66f8
4 changed files with 94 additions and 11 deletions

View File

@ -13,12 +13,4 @@ class JobNotFoundError(HTTPException):
) )
class JobAlreadyCanceledError(HTTPException):
"""Задача уже отменена."""
def __init__(self, job_id: str):
super().__init__(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Job {job_id} is already canceled or finished"
)

View File

@ -6,8 +6,9 @@ from http import HTTPStatus
from typing import Annotated from typing import Annotated
from uuid import UUID from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends
from dataloader.api.v1.exceptions import JobNotFoundError
from dataloader.api.v1.schemas import ( from dataloader.api.v1.schemas import (
JobStatusResponse, JobStatusResponse,
TriggerJobRequest, TriggerJobRequest,
@ -49,7 +50,7 @@ async def get_status(
""" """
st = await svc.status(job_id) st = await svc.status(job_id)
if not st: if not st:
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="job not found") raise JobNotFoundError(job_id=str(job_id))
return st return st
@ -63,5 +64,5 @@ async def cancel_job(
""" """
st = await svc.cancel(job_id) st = await svc.cancel(job_id)
if not st: if not st:
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="job not found") raise JobNotFoundError(job_id=str(job_id))
return st return st

View File

@ -0,0 +1,33 @@
# tests/unit/test_api_router_not_found.py
from __future__ import annotations
import pytest
from uuid import uuid4, UUID
from dataloader.api.v1.router import get_status, cancel_job
from dataloader.api.v1.exceptions import JobNotFoundError
from dataloader.api.v1.schemas import JobStatusResponse
class _FakeSvc:
async def status(self, job_id: UUID) -> JobStatusResponse | None:
return None
async def cancel(self, job_id: UUID) -> JobStatusResponse | None:
return None
@pytest.mark.unit
@pytest.mark.asyncio
async def test_router_get_status_raises_job_not_found():
svc = _FakeSvc()
with pytest.raises(JobNotFoundError):
await get_status(uuid4(), svc=svc)
@pytest.mark.unit
@pytest.mark.asyncio
async def test_router_cancel_raises_job_not_found():
svc = _FakeSvc()
with pytest.raises(JobNotFoundError):
await cancel_job(uuid4(), svc=svc)

View File

@ -0,0 +1,57 @@
# tests/unit/test_api_router_success.py
from __future__ import annotations
import pytest
from uuid import uuid4, UUID
from datetime import datetime, timezone
from dataloader.api.v1.router import get_status, cancel_job
from dataloader.api.v1.schemas import JobStatusResponse
class _SvcOK:
async def status(self, job_id: UUID) -> JobStatusResponse | None:
return JobStatusResponse(
job_id=job_id,
status="queued",
attempt=0,
started_at=None,
finished_at=None,
heartbeat_at=None,
error=None,
progress={},
)
async def cancel(self, job_id: UUID) -> JobStatusResponse | None:
return JobStatusResponse(
job_id=job_id,
status="canceled",
attempt=1,
started_at=datetime.now(timezone.utc),
finished_at=datetime.now(timezone.utc),
heartbeat_at=None,
error="by test",
progress={},
)
@pytest.mark.unit
@pytest.mark.asyncio
async def test_router_get_status_returns_response():
svc = _SvcOK()
jid = uuid4()
res = await get_status(jid, svc=svc)
assert isinstance(res, JobStatusResponse)
assert res.job_id == jid
assert res.status == "queued"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_router_cancel_returns_response():
svc = _SvcOK()
jid = uuid4()
res = await cancel_job(jid, svc=svc)
assert isinstance(res, JobStatusResponse)
assert res.job_id == jid
assert res.status == "canceled"