Compare commits

..

No commits in common. "0e888ec910f8e169db3de6dffb511e80068f3144" and "b02b4e84fecf0ff6a26b8d68c340845922c6b807" have entirely different histories.

11 changed files with 39 additions and 3047 deletions

View File

@ -1,926 +0,0 @@
про бд для OPU - нужно полученные строки грузить также по курсору стримингом в бд, но перед этим надо truncate таблицу делать, DDL вот -
Это тестовый скрипт для взаимодействия с OPU
Но тут надо сделать модель репозиторий, интерфейс и т.п, в скрипте показано только принцип получения данных.
# test_export.py
"""
Скрипт для тестирования экспорта:
- запуск задачи
- мониторинг статуса
- скачивание (полное и по частям)
- распаковка
- подсчёт строк
- замер размеров
"""
import asyncio
import httpx
import zstandard as zstd
from loguru import logger
from pathlib import Path
# ========= НАСТРОЙКИ =========
BASE_URL = "https://ci02533826-tib-brief.apps.ift-terra000024-edm.ocp.delta.sbrf.ru "
ENDPOINT_START = "/export/opu/start"
ENDPOINT_STATUS = "/export/{job_id}/status"
ENDPOINT_DOWNLOAD = "/export/{job_id}/download"
POLL_INTERVAL = 2
TIMEOUT = 3600
CHUNK_SIZE = 8192
OUTPUT_DIR = Path("./export_test")
OUTPUT_DIR.mkdir(exist_ok=True)
# ============================
def sizeof_fmt(num: int) -> str:
"""Форматирует размер файла в человекочитаемый вид."""
for unit in ['B', 'KB', 'MB', 'GB']:
if num < 1024.0:
return f"{num:.1f} {unit}"
num /= 1024.0
return f"{num:.1f} TB"
async def download_full(client: httpx.AsyncClient, url: str, filepath: Path) -> bool:
"""Скачивает файл полностью."""
logger.info(f"⬇️ Полная загрузка: {filepath.name}")
try:
response = await client.get(url, follow_redirects=True)
if response.status_code != 200:
logger.error(f"❌ Ошибка полной загрузки: {response.status_code} {response.text}")
return False
with open(filepath, "wb") as f:
f.write(response.content)
logger.success(f" Полная загрузка завершена: {sizeof_fmt(filepath.stat().st_size)}")
return True
except Exception as e:
logger.error(f"❌ Ошибка при полной загрузке: {e}")
return False
async def download_range(
client: httpx.AsyncClient,
url: str,
filepath: Path,
start: int,
end: int | None = None
) -> bool:
"""Скачивает диапазон байтов."""
headers = {"Range": f"bytes={start}-{end if end else ''}"}
logger.info(f"⬇️ Загрузка диапазона {start}-{end if end else 'end'} -> {filepath.name}")
try:
response = await client.get(url, headers=headers, follow_redirects=True)
if response.status_code != 206:
logger.error(f"❌ Ожидался 206 Partial Content, получен: {response.status_code}")
return False
with open(filepath, "wb") as f:
f.write(response.content)
logger.success(f" Диапазон сохранён: {sizeof_fmt(filepath.stat().st_size)} байт")
return True
except Exception as e:
logger.error(f"❌ Ошибка при загрузке диапазона: {e}")
return False
async def main():
logger.info("🚀 Запуск теста экспорта данных")
cert_path = r"C:\Users\23193453\Documents\code\cert\client_cert.pem"
key_path = r"C:\Users\23193453\Documents\code\cert\client_cert.key"
ca_path = r"C:\Users\23193453\Documents\code\cert\client_ca.pem"
async with httpx.AsyncClient(
cert=(cert_path, key_path),
verify=ca_path,
timeout=TIMEOUT
) as client:
try:
# --- Шаг 1: Запуск задачи ---
logger.info("📨 Отправка запроса на запуск экспорта OPU...")
response = await client.post(BASE_URL + ENDPOINT_START)
if response.status_code != 200:
logger.error(f"❌ Ошибка запуска задачи: {response.status_code} {response.text}")
return
job_id = response.json()["job_id"]
logger.info(f" Задача запущена: job_id={job_id}")
# --- Шаг 2: Мониторинг статуса ---
status_url = ENDPOINT_STATUS.format(job_id=job_id)
logger.info("⏳ Ожидание завершения задачи...")
start_wait = asyncio.get_event_loop().time()
while True:
response = await client.get(BASE_URL + status_url)
if response.status_code != 200:
logger.warning(f"⚠️ Ошибка при получении статуса: {response.status_code}")
await asyncio.sleep(POLL_INTERVAL)
continue
status_data = response.json()
status = status_data["status"]
total_rows = status_data["total_rows"]
elapsed = asyncio.get_event_loop().time() - start_wait
logger.debug(f"📊 Статус: {status}, строк обработано: {total_rows}, прошло: {elapsed:.1f} с")
if status == "completed":
logger.info(f"🎉 Задача завершена! Обработано строк: {total_rows}")
break
elif status == "failed":
logger.error(f"💥 Задача завершилась с ошибкой: {status_data['error']}")
return
elif status in ("pending", "running"):
await asyncio.sleep(POLL_INTERVAL)
continue
download_url = BASE_URL + ENDPOINT_DOWNLOAD.format(job_id=job_id)
# --- Тест 1: Полная загрузка ---
full_path = OUTPUT_DIR / f"full_export_{job_id}.jsonl.zst"
if not await download_full(client, download_url, full_path):
return
# --- Тест 2: Первые 1024 байта ---
range_path = OUTPUT_DIR / f"range_head_{job_id}.bin"
if not await download_range(client, download_url, range_path, start=0, end=1023):
return
# --- Тест 3: Возобновление с 1024 байта ---
resume_path = OUTPUT_DIR / f"range_resume_{job_id}.bin"
if not await download_range(client, download_url, resume_path, start=1024):
return
# --- Анализ полного архива ---
archive_size = full_path.stat().st_size
logger.success(f"📦 Полный архив: {sizeof_fmt(archive_size)}")
# --- Распаковка ---
unpacked_path = OUTPUT_DIR / f"export_{job_id}.jsonl"
logger.info(f"📦 Распаковка архива в: {unpacked_path.name}")
dctx = zstd.ZstdDecompressor()
try:
with open(full_path, "rb") as compressed:
with open(unpacked_path, "wb") as dest:
dctx.copy_stream(compressed, dest)
unpacked_size = unpacked_path.stat().st_size
logger.success(f" Распаковано: {sizeof_fmt(unpacked_size)}")
# --- Подсчёт строк ---
logger.info("🧮 Подсчёт строк в распакованном файле...")
with open(unpacked_path, "rb") as f:
line_count = sum(1 for _ in f)
logger.success(f" Файл содержит {line_count:,} строк")
# --- Итог ---
logger.info("📈 ИТОГИ:")
logger.info(f" Архив: {sizeof_fmt(archive_size)}")
logger.info(f" Распаковано: {sizeof_fmt(unpacked_size)}")
logger.info(f" Коэффициент сжатия: {archive_size / unpacked_size:.2f}x")
logger.info(f" Строк: {line_count:,}")
except Exception as e:
logger.exception(f"❌ Ошибка при распаковке: {e}")
logger.info("🏁 Все тесты завершены успешно!")
except httpx.ConnectError as e:
logger.critical(f"❌ Не удалось подключиться к серверу. Убедитесь, что сервис запущен. {e}")
except Exception as e:
logger.exception(f"❌ Неожиданная ошибка: {e}")
if __name__ == "__main__":
asyncio.run(main())
А от код сервиса, который отдает данные OPU -
а в отдающем сервисе OPU такой код -
service\src\gmap2\models\base.py -
"""
Модуль базовых моделей для ORM.
Содержит базовый класс и миксин для динамического определения имён таблиц
на основе конфигурации приложения.
"""
from sqlalchemy.orm import declarative_base, declared_attr
from gmap2.config import APP_CONFIG
TABLE_NAME_MAPPING = {
"opudata": APP_CONFIG.gp.opu_table,
"lprnewsdata": APP_CONFIG.gp.lpr_news_table,
}
BASE = declarative_base()
class DynamicTableMixin: # pylint: disable=too-few-public-methods
"""
Миксин для динамического определения имени таблицы через __tablename__.
Имя таблицы определяется по имени класса в нижнем регистре.
Если имя класса присутствует в TABLE_NAME_MAPPING, используется значение из маппинга.
В противном случае используется имя класса как имя таблицы.
"""
@declared_attr
def __tablename__(cls) -> str: # pylint: disable=no-self-argument
"""
Динамически возвращает имя таблицы для модели.
Использует маппинг из конфигурации, если имя класса известно.
В противном случае возвращает имя класса в нижнем регистре.
:return: Имя таблицы, используемое SQLAlchemy при работе с моделью.
"""
class_name = cls.__name__.lower()
return TABLE_NAME_MAPPING.get(class_name, class_name)
service\src\gmap2\models\opu.py -
"""
ORM-модель для таблицы OPU.
Замечание:
В БД отсутствует PRIMARY KEY. Для корректной работы SQLAlchemy
используется виртуальный составной первичный ключ (wf_row_id, wf_load_id).
Это не влияет на экспорт, но требуется для итерации через ORM.
"""
from sqlalchemy import Column, Date, DateTime, Float, Integer, String
from .base import BASE, DynamicTableMixin
class OPUData(DynamicTableMixin, BASE): # pylint: disable=too-few-public-methods
"""
Данные из таблицы brief_opu.
"""
__table_args__ = {"schema": "brief_opu_schema"}
wf_row_id = Column(Integer, primary_key=True, nullable=False)
wf_load_id = Column(Integer, primary_key=True, nullable=False)
object_id = Column(String)
object_nm = Column(String)
desk_nm = Column(String)
object_tp = Column(String)
object_unit = Column(String)
actdate = Column(Date)
layer_cd = Column(String)
layer_nm = Column(String)
measure = Column(String)
opu_cd = Column(String)
opu_nm_sh = Column(String)
opu_nm = Column(String)
opu_lvl = Column(Integer)
product_nm = Column(String)
opu_prnt_cd = Column(String)
opu_prnt_nm_sh = Column(String)
opu_prnt_nm = Column(String)
product_prnt_nm = Column(String)
sum_amountrub_p_usd = Column(Float)
wf_load_dttm = Column(DateTime)
service\src\gmap2\repositories\base.py -
"""
Базовый репозиторий с общим функционалом стриминга.
"""
from collections.abc import AsyncGenerator, Callable
from typing import Generic, List, Optional, TypeVar, Union
from typing_extensions import Self
from loguru import logger
from sqlalchemy import Select, select
from sqlalchemy.ext.asyncio import AsyncSession
_T_Model = TypeVar("_T_Model") # pylint: disable=invalid-name
class BaseRepository(Generic[_T_Model]):
"""
Абстрактный репозиторий с поддержкой стриминга.
"""
model: type[_T_Model]
default_order_by: Union[List[Callable[[], List]], List] = []
def __init__(self, session: AsyncSession) -> None:
"""
Инициализирует репозиторий.
Args:
session: Асинхронная сессия SQLAlchemy.
"""
self.session = session
async def stream_all(
self,
chunk_size: int = 10_000,
statement: Optional[Select[tuple[_T_Model]]] = None,
) -> AsyncGenerator[list[_T_Model], None]:
"""
Потоково извлекает записи из БД пачками.
Выполняет стриминг данных с использованием асинхронного курсора.
Поддерживает кастомный запрос и автоматическое упорядочивание.
:param chunk_size: Размер пачки записей для одной итерации.
:param statement: Опциональный SQL-запрос.
:yield: Список экземпляров модели, загруженных порциями.
"""
logger.info(f"Streaming {self.model.__name__} in batches of {chunk_size}")
stmt = statement or select(self.model)
if not statement and self.default_order_by:
stmt = stmt.order_by(*self.default_order_by)
try:
result = await self.session.stream(
stmt.execution_options(
stream_results=True,
max_row_count=chunk_size,
)
)
partitions_method = result.partitions
partitions_result = partitions_method(chunk_size)
async for partition in partitions_result:
items = [row[0] for row in partition]
yield items
logger.info(f"Finished streaming {self.model.__name__}")
except Exception as e:
logger.error(
f"Error during streaming {self.model.__name__}: {type(e).__name__}: {e}"
)
raise
@classmethod
def create(cls, session: AsyncSession) -> Self:
"""
Фабричный метод.
Args:
session: Асинхронная сессия.
Returns:
Экземпляр репозитория.
"""
return cls(session=session)
service\src\gmap2\repositories\opu_repository.py -
"""
Репозиторий для OPU.
"""
from gmap2.models.opu import OPUData
from .base import BaseRepository
class OPURepository(BaseRepository[OPUData]):
"""
Репозиторий для OPUData.
"""
model = OPUData
default_order_by = [
OPUData.wf_load_id,
OPUData.wf_row_id,
]
service\src\gmap2\api\v1\routes.py -
"""
Маршруты API для запуска, статуса и скачивания экспорта.
"""
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request
from gmap2.services.job.background_worker import (
run_lpr_news_export_job,
run_opu_export_job,
)
from gmap2.services.job.job_manager import JobManager
from gmap2.utils.file_utils import create_range_aware_response
from .schemas import ExportJobStatus, StartExportResponse
router = APIRouter(prefix="/export", tags=["export"])
def get_job_manager(request: Request) -> JobManager:
"""
Извлекает JobManager из application state.
"""
return request.app.state.job_manager
@router.post("/opu/start", response_model=StartExportResponse)
async def start_opu_export(
request: Request, background_tasks: BackgroundTasks
) -> StartExportResponse:
"""
Запускает фоновую задачу экспорта данных OPU.
Returns:
Идентификатор задачи.
"""
job_manager = get_job_manager(request)
job_id = job_manager.start_job("opu")
background_tasks.add_task(run_opu_export_job, job_id, job_manager)
return StartExportResponse(job_id=job_id)
@router.post("/lpr-news/start", response_model=StartExportResponse)
async def start_lpr_news_export(
request: Request, background_tasks: BackgroundTasks
) -> StartExportResponse:
"""
Запускает фоновую задачу экспорта данных LPR News.
Returns:
Идентификатор задачи.
"""
job_manager = get_job_manager(request)
job_id = job_manager.start_job("lpr_news")
background_tasks.add_task(run_lpr_news_export_job, job_id, job_manager)
return StartExportResponse(job_id=job_id)
@router.get("/{job_id}/status", response_model=ExportJobStatus)
async def get_export_status(job_id: str, request: Request) -> ExportJobStatus:
"""
Возвращает текущий статус задачи экспорта.
Args:
job_id: Идентификатор задачи.
Returns:
Статус задачи.
Raises:
HTTPException 404: Если задача не найдена.
"""
job_manager = get_job_manager(request)
status = job_manager.get_job_status(job_id)
if not status:
raise HTTPException(status_code=404, detail="Job not found")
return ExportJobStatus(**status)
@router.get("/{job_id}/download")
async def download_export(job_id: str, request: Request):
"""
Стримит сжатый файл экспорта клиенту с поддержкой Range-запросов.
"""
job_manager = get_job_manager(request)
status = job_manager.get_job_status(job_id)
if not status:
raise HTTPException(status_code=404, detail="Job not found")
if status["status"] != "completed":
raise HTTPException(
status_code=409, detail=f"Job is {status['status']}, not completed"
)
file_path = status.get("temp_file_path")
if not file_path or not file_path.endswith(".jsonl.zst"):
raise HTTPException(status_code=500, detail="Export file not available")
filename = f"export_{job_id}.jsonl.zst"
return await create_range_aware_response(
file_path=file_path,
filename=filename,
request=request,
media_type="application/octet-stream",
)
service\src\gmap2\services\export\export_service.py -
"""
Сервис экспорта данных: объединяет репозиторий, форматирование и сжатие.
Формат: JSON Lines + Zstandard (.jsonl.zst), один непрерывный zstd-фрейм.
"""
from __future__ import annotations
import asyncio
import math
import os
import tempfile
import threading
from collections.abc import AsyncGenerator, Callable
from contextlib import asynccontextmanager
from datetime import date, datetime
from typing import Any, Tuple
import aiofiles
import zstandard as zstd
from loguru import logger
from orjson import OPT_NAIVE_UTC, OPT_UTC_Z # pylint: disable=no-name-in-module
from orjson import dumps as orjson_dumps # pylint: disable=no-name-in-module
from sqlalchemy.ext.asyncio import AsyncSession
from gmap2.repositories import LPRNewsRepository, OPURepository
from ..job.job_manager import JobManager
from .formatters import models_to_dicts
class _ZstdAsyncSink:
"""
Потокобезопасный приёмник для zstd.stream_writer.
Собирает сжатые данные по чанкам в памяти с использованием блокировки.
Предназначен для использования в асинхронном контексте,
где сжатие выполняется в отдельном потоке.
"""
__slots__ = ("_chunks", "_lock")
def __init__(self) -> None:
"""
Инициализирует приёмник с пустым списком чанков и блокировкой.
"""
self._chunks: list[bytes] = []
self._lock = threading.Lock()
def write(self, b: bytes) -> int:
"""
Записывает байтовый фрагмент в буфер.
Метод потокобезопасен.
Копирует данные и добавляет в внутренний список чанков.
:param b: Байтовые данные для записи.
:return: Длина записанных данных.
"""
with self._lock:
self._chunks.append(bytes(b))
return len(b)
def drain(self) -> list[bytes]:
"""
Извлекает все накопленные чанки, сбрасывая внутренний буфер.
Метод потокобезопасен.
Возвращает список всех чанков, записанных с момента последнего сброса.
:return: Список байтовых фрагментов.
"""
with self._lock:
chunks = self._chunks
self._chunks = []
return chunks
class ExportService:
"""
Основной сервис экспорта данных в формате JSON Lines + zstd.
"""
def __init__(self, chunk_size: int = 10_000, zstd_level: int = 3) -> None:
self.chunk_size = chunk_size
self.zstd_level = zstd_level
@asynccontextmanager
async def _export_to_zstd( # pylint: disable=too-many-arguments,too-many-locals
self,
*,
session: AsyncSession,
job_id: str,
job_manager: JobManager,
repo_factory: Callable[[AsyncSession], Any],
label: str,
temp_dir: str | None = None,
) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]:
repo = repo_factory(session)
file_path: str | None = None
try:
with tempfile.NamedTemporaryFile(
delete=False, suffix=".jsonl.zst", dir=temp_dir
) as tmp:
file_path = tmp.name
logger.info(f"[export] {label}: start -> {file_path}")
cctx = zstd.ZstdCompressor(level=self.zstd_level)
sink = _ZstdAsyncSink()
async with aiofiles.open(file_path, "wb") as f:
writer = cctx.stream_writer(sink)
try:
async for batch in repo.stream_all(chunk_size=self.chunk_size):
if not batch:
continue
dicts = models_to_dicts(batch)
payload = ("\n".join(_dumps(d) for d in dicts) + "\n").encode(
"utf-8"
)
await asyncio.to_thread(writer.write, payload)
for chunk in sink.drain():
await f.write(chunk)
job_manager.increment_rows(job_id, len(batch))
await asyncio.to_thread(writer.flush, zstd.FLUSH_FRAME)
for chunk in sink.drain():
await f.write(chunk)
finally:
await asyncio.to_thread(writer.close)
for chunk in sink.drain():
await f.write(chunk)
await f.flush()
logger.info(f"[export] {label}: done -> {file_path}")
yield _stream_file(file_path), file_path
except Exception:
if file_path and os.path.exists(file_path):
await asyncio.to_thread(os.remove, file_path)
logger.exception(f"[export] {label}: failed, temporary file removed")
raise
@asynccontextmanager
async def export_opu_to_zstd(
self,
session: AsyncSession,
job_id: str,
job_manager: JobManager,
temp_dir: str | None = None,
) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]:
"""
Экспорт OPU в один непрерывный zstd-поток.
"""
async with self._export_to_zstd(
session=session,
job_id=job_id,
job_manager=job_manager,
repo_factory=OPURepository,
label="OPU",
temp_dir=temp_dir,
) as ctx:
yield ctx
@asynccontextmanager
async def export_lpr_news_to_zstd(
self,
session: AsyncSession,
job_id: str,
job_manager: JobManager,
temp_dir: str | None = None,
) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]:
"""
Экспорт LPR News в один непрерывный zstd-поток.
"""
async with self._export_to_zstd(
session=session,
job_id=job_id,
job_manager=job_manager,
repo_factory=LPRNewsRepository,
label="LPR News",
temp_dir=temp_dir,
) as ctx:
yield ctx
def _dumps(data: dict) -> str:
"""
Сериализует словарь в JSON-строку (orjson).
"""
return orjson_dumps(
data,
default=_serialize_value,
option=OPT_UTC_Z | OPT_NAIVE_UTC,
).decode("utf-8")
def _serialize_value(value: Any) -> Any:
"""
Преобразует значения к JSON-совместимому виду.
"""
if isinstance(value, (datetime, date)):
return value.isoformat()
if isinstance(value, float) and not math.isfinite(value):
return None
return value
async def _stream_file(
file_path: str, chunk_size: int = 8192
) -> AsyncGenerator[bytes, None]:
"""
Асинхронно читает файл блоками.
"""
async with aiofiles.open(file_path, "rb") as f:
while chunk := await f.read(chunk_size):
yield chunk
service\src\gmap2\services\export\compressors.py -
"""
Работа со сжатием данных с использованием zstandard.
"""
from typing import BinaryIO
import zstandard as zstd
def create_zstd_writer(fileobj: BinaryIO, level: int = 3) -> zstd.ZstdCompressionWriter:
"""
Создаёт сжатый writer поверх бинарного файла.
Args:
fileobj: Целевой файл (например, tempfile).
level: Уровень сжатия (110). По умолчанию 3 - баланс скорости и размера.
Returns:
Объект для записи сжатых данных.
"""
cctx = zstd.ZstdCompressor(level=level)
return cctx.stream_writer(fileobj)
service\src\gmap2\services\export\formatters.py -
"""
Форматирование ORM-объектов в словари.
С оптимизацией: кеширование структуры модели.
"""
from datetime import date, datetime
from decimal import Decimal
from typing import Any, Dict, List
from weakref import WeakKeyDictionary
from sqlalchemy import inspect
from sqlalchemy.orm import RelationshipProperty
_columns_cache: WeakKeyDictionary = WeakKeyDictionary()
def _get_model_columns(model_instance: Any) -> List[str]:
"""
Возвращает список имен колонок модели (без отношений).
Кеширует результат.
"""
model_class = model_instance.__class__
if model_class not in _columns_cache:
mapper = inspect(model_class)
columns = [
attr.key
for attr in mapper.attrs
if not isinstance(attr, RelationshipProperty)
]
_columns_cache[model_class] = columns
return _columns_cache[model_class]
def _serialize_value(value: Any) -> Any:
"""
Сериализует значение в JSON-совместимый формат.
"""
if isinstance(value, (datetime, date)):
return value.isoformat()
if isinstance(value, Decimal):
return float(value)
if value is None:
return None
return value
def models_to_dicts(instances: List[Any]) -> List[Dict[str, Any]]:
"""
Массовое преобразование списка ORM-объектов в словари.
Использует кеширование структуры модели для высокой производительности.
Args:
instances: Список ORM-объектов.
Returns:
Список словарей.
"""
if not instances:
return []
columns = _get_model_columns(instances[0])
return [
{col: _serialize_value(getattr(obj, col)) for col in columns}
for obj in instances
]
__all__ = [
"models_to_dicts",
]
service\src\gmap2\services\job\background_worker.py -
"""
Фоновые задачи экспорта.
"""
from psycopg import errors as pg_errors
from loguru import logger
from sqlalchemy.exc import DatabaseError, OperationalError
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from gmap2.context import APP_CTX
from gmap2.services.export.export_service import ExportService
from typing import Optional
TRANSIENT_ERRORS = (
OperationalError,
DatabaseError,
ConnectionError,
TimeoutError,
pg_errors.ConnectionException,
pg_errors.AdminShutdown,
pg_errors.CannotConnectNow,
)
def _create_export_job(
export_method_name: str,
job_type: str,
):
"""
Фабрика фоновых задач экспорта.
Args:
export_method_name: Название метода экспорта в ExportService.
job_type: Тип задачи ("opu", "lpr_news").
Returns:
Асинхронная функция-задача.
"""
@retry(
stop=stop_after_attempt(10),
wait=wait_exponential(multiplier=1, max=60),
retry=retry_if_exception_type(TRANSIENT_ERRORS),
reraise=True,
)
async def run_export_job(
job_id: str, job_manager, temp_dir: Optional[str] = None
) -> None:
export_service = ExportService()
try:
async with APP_CTX.get_db_session() as session:
job_manager.mark_running(job_id)
export_method = getattr(export_service, export_method_name)
async with export_method(
session=session,
job_id=job_id,
job_manager=job_manager,
temp_dir=temp_dir,
) as (stream, file_path):
async for _ in stream:
pass
job_manager.mark_completed(job_id, file_path)
except Exception as e:
if not isinstance(e, TRANSIENT_ERRORS):
job_manager.mark_failed(job_id, f"{type(e).__name__}: {e}")
logger.exception(f"Export job {job_id} ({job_type}) failed")
raise
run_export_job.__name__ = f"run_{job_type}_export_job"
run_export_job.__doc__ = f"Фоновая задача экспорта {job_type.upper()} с retry."
return run_export_job
run_opu_export_job = _create_export_job("export_opu_to_zstd", "opu")
run_lpr_news_export_job = _create_export_job("export_lpr_news_to_zstd", "lpr_news")

File diff suppressed because it is too large Load Diff

526
README.md
View File

@ -153,108 +153,19 @@ src/dataloader/
## HTTP API (v1)
### POST `/api/v1/jobs/trigger`
- POST `/api/v1/jobs/trigger`
- Вход: `{queue, task, args?, idempotency_key?, lock_key, partition_key?, priority?, available_at?, max_attempts?, lease_ttl_sec?, producer?, consumer_group?}`
- Выход: `{job_id: UUID, status: str}`
- Идемпотентность по `idempotency_key` (если задан).
Постановка задачи в очередь (идемпотентная операция).
- GET `/api/v1/jobs/{job_id}/status`
- Выход: `{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress: {}}`
**Request:**
```json
{
"queue": "load.cbr", // обязательно: имя очереди
"task": "load.cbr.rates", // обязательно: имя задачи для registry
"args": { // опционально: аргументы задачи
"date": "2025-01-10",
"currencies": ["USD", "EUR"]
},
"idempotency_key": "cbr_2025-01-10", // опционально: ключ идемпотентности
"lock_key": "cbr_rates", // обязательно: ключ для advisory lock
"partition_key": "2025-01-10", // опционально: ключ партиционирования
"priority": 100, // опционально: приоритет (меньше = выше)
"available_at": "2025-01-10T00:00:00Z", // опционально: отложенный запуск
"max_attempts": 3, // опционально: макс попыток (def: 5)
"lease_ttl_sec": 300, // опционально: TTL аренды (def: 60)
"producer": "api-client", // опционально: кто поставил
"consumer_group": "cbr-loaders" // опционально: группа потребителей
}
```
- POST `/api/v1/jobs/{job_id}/cancel`
- Выход: как в status
- Поведение: устанавливает `cancel_requested = true`; воркер завершает между шагами пайплайна.
**Response 200:**
```json
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "queued"
}
```
**Коды ответов:**
- `200 OK` - задача создана или уже существует (идемпотентность)
- `400 Bad Request` - невалидные данные
- `500 Internal Server Error` - ошибка сервера
### GET `/api/v1/jobs/{job_id}/status`
Получение статуса задачи.
**Response 200:**
```json
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "running", // queued/running/succeeded/failed/canceled
"attempt": 1, // текущая попытка
"started_at": "2025-01-10T12:00:00Z", // время первого запуска
"finished_at": null, // время завершения (если есть)
"heartbeat_at": "2025-01-10T12:01:30Z", // последний heartbeat
"error": null, // текст ошибки (если есть)
"progress": { // прогресс выполнения (custom)
"processed": 500,
"total": 1000
}
}
```
**Коды ответов:**
- `200 OK` - статус получен
- `404 Not Found` - задача не найдена
### POST `/api/v1/jobs/{job_id}/cancel`
Запрос кооперативной отмены задачи.
**Response 200:**
```json
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "running",
"attempt": 1,
"started_at": "2025-01-10T12:00:00Z",
"heartbeat_at": "2025-01-10T12:01:30Z"
}
```
**Поведение:**
- Устанавливает флаг `cancel_requested = true` в БД
- Воркер проверяет флаг между `yield` в пайплайне
- При обнаружении флага воркер завершает задачу со статусом `canceled`
**Коды ответов:**
- `200 OK` - запрос отмены принят
- `404 Not Found` - задача не найдена
### Инфраструктурные эндпоинты
**GET `/health`** - проверка работоспособности (без БД, < 20ms)
```json
{"status": "healthy"}
```
**GET `/info`** - информация о сервисе
```json
{
"service": "dataloader",
"version": "1.0.0",
"environment": "production"
}
```
Инфраструктурные: `/health`, `/info`.
## Воркеры, пайплайны и добавление новых ETLзадач
@ -267,78 +178,6 @@ src/dataloader/
5. Выполняет пайплайн (`task`) с поддержкой итеративных шагов и кооперативной отмены.
6. По завершении: `succeeded` или `failed`/`canceled`; при ошибках возможны ретраи до `max_attempts`.
### Протокол выполнения задачи (SQL)
**Claim (захват задачи):**
```sql
WITH cte AS (
SELECT job_id
FROM dl_jobs
WHERE status = 'queued'
AND queue = :queue
AND available_at <= now()
ORDER BY priority ASC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE dl_jobs j
SET status = 'running',
started_at = COALESCE(started_at, now()),
attempt = attempt + 1,
lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec),
heartbeat_at = now()
FROM cte
WHERE j.job_id = cte.job_id
RETURNING j.job_id, j.task, j.args, j.lock_key, j.lease_ttl_sec;
```
**Heartbeat (продление аренды):**
```sql
UPDATE dl_jobs
SET heartbeat_at = now(),
lease_expires_at = now() + make_interval(secs => :ttl)
WHERE job_id = :job_id AND status = 'running'
RETURNING cancel_requested;
```
**Завершение успешное:**
```sql
UPDATE dl_jobs
SET status = 'succeeded',
finished_at = now(),
lease_expires_at = NULL
WHERE job_id = :job_id;
SELECT pg_advisory_unlock(hashtext(:lock_key));
```
**Завершение с ошибкой (retry):**
```sql
UPDATE dl_jobs
SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END,
available_at = CASE WHEN attempt < max_attempts
THEN now() + make_interval(secs => 30 * attempt)
ELSE now() END,
error = :error_message,
lease_expires_at = NULL,
finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END
WHERE job_id = :job_id;
SELECT pg_advisory_unlock(hashtext(:lock_key));
```
**Reaper (возврат потерянных задач):**
```sql
UPDATE dl_jobs
SET status = 'queued',
available_at = now(),
lease_expires_at = NULL
WHERE status = 'running'
AND lease_expires_at IS NOT NULL
AND lease_expires_at < now()
RETURNING job_id;
```
### Интерфейс пайплайна
Пайплайн - обычная функция, возвращающая одно из:
- асинхронный генератор шагов (рекомендуется для длинных процессов),
@ -375,84 +214,11 @@ async def load_customers(args: dict) -> AsyncIterator[None]:
- Для бизнес‑взаимного исключения выбирайте корректный `lock_key` (например, `customer:{id}`), чтобы параллельные задачи не конфликтовали.
### Добавление ETLзадачи (шаги)
**1. Создать пайплайн** в `src/dataloader/workers/pipelines/`:
```python
# src/dataloader/workers/pipelines/load_cbr_rates.py
from __future__ import annotations
from typing import AsyncIterator
from datetime import datetime
from dataloader.workers.pipelines.registry import register
@register("load.cbr.rates")
async def load_cbr_rates(args: dict) -> AsyncIterator[None]:
"""
Загрузка курсов валют ЦБ РФ.
Args:
args: {"date": "YYYY-MM-DD", "currencies": ["USD", "EUR"]}
"""
date = datetime.fromisoformat(args["date"])
currencies = args.get("currencies", ["USD", "EUR"])
# Извлечение данных
data = await fetch_cbr_rates(date, currencies)
yield # Heartbeat checkpoint
# Трансформация
transformed = transform_rates(data)
yield # Heartbeat checkpoint
# Идемпотентная загрузка в БД
async with get_target_session() as session:
await session.execute(
insert(CbrRates)
.values(transformed)
.on_conflict_do_update(
index_elements=["date", "currency"],
set_={"rate": excluded.c.rate, "updated_at": func.now()}
)
)
await session.commit()
yield
```
**2. Настроить воркеры** в `.env`:
```bash
WORKERS_JSON='[{"queue":"load.cbr","concurrency":2}]'
```
**3. Поставить задачу через API**:
```bash
curl -X POST http://localhost:8081/api/v1/jobs/trigger \
-H "Content-Type: application/json" \
-d '{
"queue": "load.cbr",
"task": "load.cbr.rates",
"args": {"date": "2025-01-10", "currencies": ["USD", "EUR"]},
"lock_key": "cbr_rates_2025-01-10",
"partition_key": "2025-01-10",
"priority": 100,
"max_attempts": 3,
"lease_ttl_sec": 300
}'
```
**4. Мониторить выполнение**:
```bash
# Получить job_id из ответа и проверить статус
curl http://localhost:8081/api/v1/jobs/{job_id}/status
```
**Ключевые моменты**:
- Пайплайн должен быть идемпотентным (повторный запуск не должен ломать данные)
- Используйте `yield` после каждого значимого чанка работы для heartbeat
- `lock_key` должен обеспечивать взаимное исключение (например, `customer:{id}`)
- `partition_key` используется для параллелизации независимых задач
1. Создать модуль в `workers/pipelines/` и зарегистрировать обработчик через `@register`.
2. Убедиться, что модуль импортируется автоматически (происходит на старте через `load_all()`).
3. При необходимости расширить схему `args` и валидацию на стороне продьюсера (кто вызывает API).
4. При постановке задачи в `POST /api/v1/jobs/trigger` указать `task` с новым именем и желаемые `args`.
5. При необходимости завести отдельную очередь (`queue`) и добавить её в `WORKERS_JSON` с нужной `concurrency`.
## Логирование, метрики, аудит
@ -462,257 +228,19 @@ curl http://localhost:8081/api/v1/jobs/{job_id}/status
## Тестирование
### Структура тестов
```
tests/
├── conftest.py # Глобальные фикстуры (db_engine, db_session, client)
├── integration_tests/ # Интеграционные тесты с реальной БД
│ ├── test_queue_repository.py # 12 тестов репозитория
│ └── test_api_endpoints.py # 7 тестов API endpoints
└── unit/ # Юнит-тесты с моками (92 теста)
├── test_config.py # 30 тестов конфигурации
├── test_context.py # 13 тестов AppContext
├── test_api_service.py # 10 тестов сервисного слоя
├── test_notify_listener.py # 13 тестов LISTEN/NOTIFY
├── test_workers_base.py # 14 тестов PGWorker
├── test_workers_manager.py # 10 тестов WorkerManager
└── test_pipeline_registry.py # 5 тестов реестра пайплайнов
```
### Запуск тестов
```bash
# Все тесты (111 тестов)
poetry run pytest
# Только юнит-тесты
poetry run pytest tests/unit/ -m unit
# Только интеграционные
poetry run pytest tests/integration_tests/ -m integration
# С покрытием кода
poetry run pytest --cov=dataloader --cov-report=html
# С подробным выводом
poetry run pytest -v -s
```
### Покрытие кода
Текущее покрытие: **91%** (788 строк / 715 покрыто)
```
Name Stmts Miss Cover
---------------------------------------------------------------
src/dataloader/config.py 79 0 100%
src/dataloader/context.py 39 0 100%
src/dataloader/api/v1/service.py 32 0 100%
src/dataloader/storage/models/queue.py 43 0 100%
src/dataloader/storage/schemas/queue.py 29 0 100%
src/dataloader/storage/notify_listener.py 49 0 100%
src/dataloader/workers/base.py 102 3 97%
src/dataloader/workers/manager.py 64 0 100%
src/dataloader/storage/repositories/queue 130 12 91%
---------------------------------------------------------------
TOTAL 788 73 91%
```
### Ключевые тест-сценарии
**Интеграционные тесты:**
- Постановка задачи через API → проверка статуса
- Идемпотентность через `idempotency_key`
- Claim задачи → heartbeat → успешное завершение
- Claim задачи → ошибка → retry → финальный fail
- Конкуренция воркеров через advisory lock
- Возврат потерянных задач (reaper)
- Отмена задачи пользователем
**Юнит-тесты:**
- Конфигурация из переменных окружения
- Создание и управление воркерами
- LISTEN/NOTIFY механизм
- Сервисный слой и репозиторий
- Протокол heartbeat и отмены
- Юнит‑тесты: `tests/unit/*`
- Интеграционные тесты: `tests/integration_tests/*` - покрывают API, репозиторий очереди, reaper.
- Запуск:
```bash
poetry run pytest -q
```
### Масштабирование
- **Вертикальное**: Увеличение `concurrency` в `WORKERS_JSON` для существующих воркеров
- **Горизонтальное**: Увеличение количества реплик (pods). Очередь в БД и advisory-lock обеспечат корректность при конкуренции между репликами
- **По очередям**: Разные deployment'ы для разных очередей с разными ресурсами
### Graceful Shutdown
При получении SIGTERM:
1. Останавливает прием новых HTTP запросов
2. Сигнализирует воркерам о необходимости завершения
3. Ждет завершения текущих задач (timeout 30 сек)
4. Останавливает reaper
5. Закрывает соединения с БД
### Мониторинг
**Health Checks:**
- `GET /health` - проверка работоспособности (без БД, < 20ms)
- `GET /info` - информация о версии
**Метрики (если включен metric_router):**
- Количество задач по статусам (queued, running, succeeded, failed)
- Время выполнения задач (p50, p95, p99)
- Количество активных воркеров
- Частота ошибок
**Логи:**
Структурированные JSON-логи с уровнями: DEBUG, INFO, WARNING, ERROR, CRITICAL
**Ключевые события для алертов:**
- `worker.claim.backoff` - частые backoff'ы (возможна конкуренция)
- `worker.complete.failed` - высокий процент ошибок
- `reaper.requeued` - частый возврат потерянных задач (проблемы с lease)
- `api.error` - ошибки API
## Эксплуатация и масштабирование
- Один процесс FastAPI с пулом асинхронных воркеров внутри. Масштабирование - количеством реплик (Deployment). Очередь в БД и advisorylock обеспечат корректность при гонках между репликами.
- Readiness/Liveness - `/health`.
- Корректное завершение: при SIGTERM менеджер подаёт сигнал остановки, завершает воркеры и reaper, закрывает соединения.
## Troubleshooting
### Задачи застревают в статусе `queued`
**Симптомы:** Задачи не начинают выполняться, остаются в `queued`.
**Возможные причины:**
1. Воркеры не запущены или упали
2. Нет воркеров для данной очереди в `WORKERS_JSON`
3. `available_at` в будущем
**Решение:**
```bash
# Проверить логи воркеров
docker logs dataloader | grep worker
# Проверить конфигурацию
echo $WORKERS_JSON
# Проверить задачи в БД
SELECT job_id, queue, status, available_at, created_at
FROM dl_jobs
WHERE status = 'queued'
ORDER BY created_at DESC
LIMIT 10;
```
### Задачи часто возвращаются в `queued` (backoff)
**Симптомы:** В логах частые события `worker.claim.backoff`.
**Причины:**
- Конкуренция за `lock_key`: несколько задач с одинаковым `lock_key` одновременно
- Advisory lock уже занят другим процессом
**Решение:**
- Проверить корректность выбора `lock_key` (должен быть уникальным для бизнес-сущности)
- Использовать `partition_key` для распределения нагрузки
- Снизить `concurrency` для данной очереди
### Высокий процент `failed` задач
**Симптомы:** Много задач завершаются с `status = 'failed'`.
**Диагностика:**
```sql
SELECT job_id, task, error, attempt, max_attempts
FROM dl_jobs
WHERE status = 'failed'
ORDER BY finished_at DESC
LIMIT 20;
```
**Возможные причины:**
- Ошибки в коде пайплайна
- Недоступность внешних сервисов
- Таймауты (превышение `lease_ttl_sec`)
- Неверные аргументы в `args`
**Решение:**
- Проверить логи с `job_id`
- Увеличить `max_attempts` для retry
- Увеличить `lease_ttl_sec` для долгих операций
- Исправить код пайплайна
### Медленное выполнение задач
**Симптомы:** Задачи выполняются дольше ожидаемого.
**Диагностика:**
```sql
SELECT
task,
AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_duration_sec,
COUNT(*) as total
FROM dl_jobs
WHERE status IN ('succeeded', 'failed')
AND finished_at > NOW() - INTERVAL '1 hour'
GROUP BY task
ORDER BY avg_duration_sec DESC;
```
**Возможные причины:**
- Неоптимальный код пайплайна
- Медленные внешние сервисы
- Недостаточно воркеров (`concurrency` слишком мал)
- Проблемы с БД (медленные запросы, блокировки)
**Решение:**
- Профилировать код пайплайна
- Увеличить `concurrency` в `WORKERS_JSON`
- Оптимизировать запросы к БД (индексы, batching)
- Масштабировать горизонтально (больше реплик)
### Утечка памяти
**Симптомы:** Постепенный рост потребления памяти, OOM kills.
**Диагностика:**
```bash
# Мониторинг памяти
kubectl top pods -l app=dataloader --containers
# Проверить логи перед падением
kubectl logs dataloader-xxx --previous
```
**Возможные причины:**
- Накопление объектов в памяти в пайплайне
- Незакрытые соединения/файлы
- Утечки в зависимостях
**Решение:**
- Использовать context managers (`async with`) для ресурсов
- Обрабатывать данные чанками, не загружать всё в память
- Периодически перезапускать воркеры (restart policy)
### Проблемы с LISTEN/NOTIFY
**Симптомы:** Воркеры не просыпаются сразу после постановки задачи.
**Диагностика:**
```bash
# Проверить логи listener
docker logs dataloader | grep "notify_listener\|LISTEN"
# Проверить триггеры в БД
SELECT * FROM pg_trigger WHERE tgname LIKE 'dl_jobs%';
```
**Возможные причины:**
- Триггеры не созданы или отключены
- Проблемы с подключением asyncpg
- Воркер не подписан на канал
**Решение:**
- Воркер автоматически fallback'ится на polling при проблемах с LISTEN/NOTIFY
- Проверить DDL: триггеры `dl_jobs_notify_ins` и `dl_jobs_notify_upd`
- Проверить права пользователя БД на LISTEN/NOTIFY
## Лицензия
Внутренний корпоративный проект.

View File

@ -13,4 +13,12 @@ class JobNotFoundError(HTTPException):
)
class JobAlreadyCanceledError(HTTPException):
"""Задача уже отменена."""
def __init__(self, job_id: str):
super().__init__(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Job {job_id} is already canceled or finished"
)

View File

@ -6,9 +6,8 @@ from http import HTTPStatus
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException
from dataloader.api.v1.exceptions import JobNotFoundError
from dataloader.api.v1.schemas import (
JobStatusResponse,
TriggerJobRequest,
@ -50,7 +49,7 @@ async def get_status(
"""
st = await svc.status(job_id)
if not st:
raise JobNotFoundError(job_id=str(job_id))
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="job not found")
return st
@ -64,5 +63,5 @@ async def cancel_job(
"""
st = await svc.cancel(job_id)
if not st:
raise JobNotFoundError(job_id=str(job_id))
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="job not found")
return st

View File

@ -24,7 +24,6 @@ from dataloader.storage.engine import create_engine, create_sessionmaker
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
pytestmark = pytest.mark.asyncio
@pytest_asyncio.fixture(scope="function")
async def db_engine() -> AsyncGenerator[AsyncEngine, None]:

View File

@ -1,33 +0,0 @@
# tests/unit/test_api_router_not_found.py
from __future__ import annotations
import pytest
from uuid import uuid4, UUID
from dataloader.api.v1.router import get_status, cancel_job
from dataloader.api.v1.exceptions import JobNotFoundError
from dataloader.api.v1.schemas import JobStatusResponse
class _FakeSvc:
async def status(self, job_id: UUID) -> JobStatusResponse | None:
return None
async def cancel(self, job_id: UUID) -> JobStatusResponse | None:
return None
@pytest.mark.unit
@pytest.mark.asyncio
async def test_router_get_status_raises_job_not_found():
svc = _FakeSvc()
with pytest.raises(JobNotFoundError):
await get_status(uuid4(), svc=svc)
@pytest.mark.unit
@pytest.mark.asyncio
async def test_router_cancel_raises_job_not_found():
svc = _FakeSvc()
with pytest.raises(JobNotFoundError):
await cancel_job(uuid4(), svc=svc)

View File

@ -1,57 +0,0 @@
# tests/unit/test_api_router_success.py
from __future__ import annotations
import pytest
from uuid import uuid4, UUID
from datetime import datetime, timezone
from dataloader.api.v1.router import get_status, cancel_job
from dataloader.api.v1.schemas import JobStatusResponse
class _SvcOK:
async def status(self, job_id: UUID) -> JobStatusResponse | None:
return JobStatusResponse(
job_id=job_id,
status="queued",
attempt=0,
started_at=None,
finished_at=None,
heartbeat_at=None,
error=None,
progress={},
)
async def cancel(self, job_id: UUID) -> JobStatusResponse | None:
return JobStatusResponse(
job_id=job_id,
status="canceled",
attempt=1,
started_at=datetime.now(timezone.utc),
finished_at=datetime.now(timezone.utc),
heartbeat_at=None,
error="by test",
progress={},
)
@pytest.mark.unit
@pytest.mark.asyncio
async def test_router_get_status_returns_response():
svc = _SvcOK()
jid = uuid4()
res = await get_status(jid, svc=svc)
assert isinstance(res, JobStatusResponse)
assert res.job_id == jid
assert res.status == "queued"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_router_cancel_returns_response():
svc = _SvcOK()
jid = uuid4()
res = await cancel_job(jid, svc=svc)
assert isinstance(res, JobStatusResponse)
assert res.job_id == jid
assert res.status == "canceled"

View File

@ -1,14 +1,11 @@
# tests/integration_tests/test_queue_repository.py
from __future__ import annotations
from datetime import datetime, timezone, timedelta
from uuid import uuid4
from datetime import datetime, timezone
import pytest
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from dataloader.storage.models import DLJob
from dataloader.storage.repositories import QueueRepository
from dataloader.storage.schemas import CreateJobRequest, JobStatus
@ -454,232 +451,3 @@ class TestQueueRepository:
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "queued"
async def test_claim_one_fails_on_advisory_lock_and_sets_backoff(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Проверка ветки отказа advisory-lock: задача возвращается в queued с отложенным available_at.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={"k": "v"},
idempotency_key=None,
lock_key="lock-fail-adv",
partition_key="",
priority=10,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=30,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
async def _false_lock(_: str) -> bool:
return False
repo._try_advisory_lock = _false_lock # type: ignore[method-assign]
before = datetime.now(timezone.utc)
claimed = await repo.claim_one(queue_name, claim_backoff_sec=15)
after = datetime.now(timezone.utc)
assert claimed is None
st = await repo.get_status(job_id)
assert st is not None
assert st.status == "queued"
row = (await db_session.execute(select(DLJob).where(DLJob.job_id == job_id))).scalar_one()
assert row.available_at >= before + timedelta(seconds=15)
assert row.available_at <= after + timedelta(seconds=60)
async def test_heartbeat_when_not_running_returns_false(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Heartbeat для нерunning задачи возвращает (False, False).
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock-hb-not-running",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
ok, cancel = await repo.heartbeat(job_id, ttl_sec=30)
assert ok is False
assert cancel is False
async def test_finish_fail_or_retry_marks_canceled_branch(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Ветка is_canceled=True помечает задачу как canceled и завершает её.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock-cancel",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=5)
await repo.finish_fail_or_retry(job_id, err="Canceled by test", is_canceled=True)
st = await repo.get_status(job_id)
assert st is not None
assert st.status == "canceled"
assert st.error == "Canceled by test"
assert st.finished_at is not None
async def test_requeue_lost_no_expired_returns_empty(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
requeue_lost без протухших задач возвращает пустой список.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock-none-expired",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=120,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=5)
res = await repo.requeue_lost(now=datetime.now(timezone.utc))
assert res == []
st = await repo.get_status(job_id)
assert st is not None
assert st.status == "running"
async def test_private_helpers_resolve_queue_and_advisory_unlock_are_executable(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Прямые прогоны приватных методов для покрытия редких веток.
"""
repo = QueueRepository(db_session)
rq = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock-direct-unlock",
partition_key="",
priority=1,
available_at=datetime.now(timezone.utc),
max_attempts=1,
lease_ttl_sec=5,
producer=None,
consumer_group=None,
)
await repo.create_or_get(rq)
missing_uuid = str(uuid4())
qname = await repo._resolve_queue(missing_uuid) # type: ignore[attr-defined]
assert qname == ""
await repo._advisory_unlock("lock-direct-unlock") # type: ignore[attr-defined]
async def test_cancel_returns_false_for_nonexistent_job(
self,
db_session: AsyncSession,
clean_queue_tables,
):
"""
Возвращает False при отмене несуществующей задачи.
"""
repo = QueueRepository(db_session)
assert await repo.cancel(str(uuid4())) is False
async def test_finish_ok_silent_when_job_absent(
self,
db_session: AsyncSession,
clean_queue_tables,
):
"""
Тихо завершается, если задача не найдена.
"""
repo = QueueRepository(db_session)
await repo.finish_ok(str(uuid4()))
async def test_finish_fail_or_retry_noop_when_job_absent(
self,
db_session: AsyncSession,
clean_queue_tables,
):
"""
Тихо выходит при отсутствии задачи.
"""
repo = QueueRepository(db_session)
await repo.finish_fail_or_retry(str(uuid4()), err="no-op")

View File

@ -444,69 +444,3 @@ class TestPGWorker:
results.append(_)
assert len(results) == 3
@pytest.mark.asyncio
async def test_claim_and_execute_once_handles_shutdown_cancelled_error(self):
cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=5)
stop_event = asyncio.Event()
with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \
patch("dataloader.workers.base.QueueRepository") as mock_repo_cls:
mock_session = AsyncMock()
mock_sm = MagicMock()
mock_sm.return_value.__aenter__.return_value = mock_session
mock_sm.return_value.__aexit__.return_value = AsyncMock(return_value=False)
mock_ctx.get_logger.return_value = Mock()
mock_ctx.sessionmaker = mock_sm
mock_repo = Mock()
mock_repo.claim_one = AsyncMock(return_value={
"job_id": "test-job-id",
"lease_ttl_sec": 60,
"task": "test.task",
"args": {}
})
mock_repo.finish_fail_or_retry = AsyncMock()
mock_repo_cls.return_value = mock_repo
worker = PGWorker(cfg, stop_event)
async def raise_cancel(*_args, **_kwargs):
raise asyncio.CancelledError()
with patch.object(worker, "_execute_with_heartbeat", new=raise_cancel):
await worker._claim_and_execute_once()
mock_repo.finish_fail_or_retry.assert_called_once()
args, kwargs = mock_repo.finish_fail_or_retry.call_args
assert args[0] == "test-job-id"
assert "cancelled by shutdown" in args[1]
assert kwargs.get("is_canceled") is True
@pytest.mark.asyncio
async def test_execute_with_heartbeat_raises_cancelled_when_stop_set(self):
cfg = WorkerConfig(queue="test", heartbeat_sec=1000, claim_backoff_sec=5)
stop_event = asyncio.Event()
stop_event.set()
with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \
patch("dataloader.workers.base.QueueRepository") as mock_repo_cls:
mock_ctx.get_logger.return_value = Mock()
mock_ctx.sessionmaker = Mock()
mock_repo_cls.return_value = Mock()
worker = PGWorker(cfg, stop_event)
async def one_yield():
yield
with pytest.raises(asyncio.CancelledError):
await worker._execute_with_heartbeat("job-id", 60, one_yield())

View File

@ -1,27 +0,0 @@
# tests/unit/test_workers_reaper.py
from __future__ import annotations
import pytest
from unittest.mock import AsyncMock, patch, Mock
from dataloader.workers.reaper import requeue_lost
@pytest.mark.unit
@pytest.mark.asyncio
async def test_requeue_lost_calls_repository_and_returns_ids():
"""
Проверяет, что requeue_lost вызывает QueueRepository.requeue_lost и возвращает результат.
"""
fake_session = Mock()
with patch("dataloader.workers.reaper.QueueRepository") as repo_cls:
repo = Mock()
repo.requeue_lost = AsyncMock(return_value=["id1", "id2"])
repo_cls.return_value = repo
res = await requeue_lost(fake_session)
assert res == ["id1", "id2"]
repo_cls.assert_called_once_with(fake_session)
repo.requeue_lost.assert_awaited_once()