Compare commits
4 Commits
b02b4e84fe
...
0e888ec910
| Author | SHA1 | Date |
|---|---|---|
|
|
0e888ec910 | |
|
|
e0829d66f8 | |
|
|
6364c97f21 | |
|
|
309e62c410 |
|
|
@ -0,0 +1,926 @@
|
|||
про бд для OPU - нужно полученные строки грузить также по курсору стримингом в бд, но перед этим надо truncate таблицу делать, DDL вот -
|
||||
|
||||
|
||||
|
||||
|
||||
Это тестовый скрипт для взаимодействия с OPU
|
||||
Но тут надо сделать модель репозиторий, интерфейс и т.п, в скрипте показано только принцип получения данных.
|
||||
|
||||
# test_export.py
|
||||
"""
|
||||
Скрипт для тестирования экспорта:
|
||||
- запуск задачи
|
||||
- мониторинг статуса
|
||||
- скачивание (полное и по частям)
|
||||
- распаковка
|
||||
- подсчёт строк
|
||||
- замер размеров
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import httpx
|
||||
import zstandard as zstd
|
||||
from loguru import logger
|
||||
from pathlib import Path
|
||||
|
||||
# ========= НАСТРОЙКИ =========
|
||||
BASE_URL = "https://ci02533826-tib-brief.apps.ift-terra000024-edm.ocp.delta.sbrf.ru "
|
||||
ENDPOINT_START = "/export/opu/start"
|
||||
ENDPOINT_STATUS = "/export/{job_id}/status"
|
||||
ENDPOINT_DOWNLOAD = "/export/{job_id}/download"
|
||||
|
||||
POLL_INTERVAL = 2
|
||||
TIMEOUT = 3600
|
||||
CHUNK_SIZE = 8192
|
||||
|
||||
OUTPUT_DIR = Path("./export_test")
|
||||
OUTPUT_DIR.mkdir(exist_ok=True)
|
||||
# ============================
|
||||
|
||||
|
||||
def sizeof_fmt(num: int) -> str:
|
||||
"""Форматирует размер файла в человекочитаемый вид."""
|
||||
for unit in ['B', 'KB', 'MB', 'GB']:
|
||||
if num < 1024.0:
|
||||
return f"{num:.1f} {unit}"
|
||||
num /= 1024.0
|
||||
return f"{num:.1f} TB"
|
||||
|
||||
|
||||
async def download_full(client: httpx.AsyncClient, url: str, filepath: Path) -> bool:
|
||||
"""Скачивает файл полностью."""
|
||||
logger.info(f"⬇️ Полная загрузка: {filepath.name}")
|
||||
try:
|
||||
response = await client.get(url, follow_redirects=True)
|
||||
if response.status_code != 200:
|
||||
logger.error(f"❌ Ошибка полной загрузки: {response.status_code} {response.text}")
|
||||
return False
|
||||
with open(filepath, "wb") as f:
|
||||
f.write(response.content)
|
||||
logger.success(f" Полная загрузка завершена: {sizeof_fmt(filepath.stat().st_size)}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при полной загрузке: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def download_range(
|
||||
client: httpx.AsyncClient,
|
||||
url: str,
|
||||
filepath: Path,
|
||||
start: int,
|
||||
end: int | None = None
|
||||
) -> bool:
|
||||
"""Скачивает диапазон байтов."""
|
||||
headers = {"Range": f"bytes={start}-{end if end else ''}"}
|
||||
logger.info(f"⬇️ Загрузка диапазона {start}-{end if end else 'end'} -> {filepath.name}")
|
||||
try:
|
||||
response = await client.get(url, headers=headers, follow_redirects=True)
|
||||
if response.status_code != 206:
|
||||
logger.error(f"❌ Ожидался 206 Partial Content, получен: {response.status_code}")
|
||||
return False
|
||||
with open(filepath, "wb") as f:
|
||||
f.write(response.content)
|
||||
logger.success(f" Диапазон сохранён: {sizeof_fmt(filepath.stat().st_size)} байт")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при загрузке диапазона: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def main():
|
||||
logger.info("🚀 Запуск теста экспорта данных")
|
||||
cert_path = r"C:\Users\23193453\Documents\code\cert\client_cert.pem"
|
||||
key_path = r"C:\Users\23193453\Documents\code\cert\client_cert.key"
|
||||
ca_path = r"C:\Users\23193453\Documents\code\cert\client_ca.pem"
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
cert=(cert_path, key_path),
|
||||
verify=ca_path,
|
||||
timeout=TIMEOUT
|
||||
) as client:
|
||||
try:
|
||||
# --- Шаг 1: Запуск задачи ---
|
||||
logger.info("📨 Отправка запроса на запуск экспорта OPU...")
|
||||
response = await client.post(BASE_URL + ENDPOINT_START)
|
||||
if response.status_code != 200:
|
||||
logger.error(f"❌ Ошибка запуска задачи: {response.status_code} {response.text}")
|
||||
return
|
||||
job_id = response.json()["job_id"]
|
||||
logger.info(f" Задача запущена: job_id={job_id}")
|
||||
|
||||
# --- Шаг 2: Мониторинг статуса ---
|
||||
status_url = ENDPOINT_STATUS.format(job_id=job_id)
|
||||
logger.info("⏳ Ожидание завершения задачи...")
|
||||
start_wait = asyncio.get_event_loop().time()
|
||||
|
||||
while True:
|
||||
response = await client.get(BASE_URL + status_url)
|
||||
if response.status_code != 200:
|
||||
logger.warning(f"⚠️ Ошибка при получении статуса: {response.status_code}")
|
||||
await asyncio.sleep(POLL_INTERVAL)
|
||||
continue
|
||||
|
||||
status_data = response.json()
|
||||
status = status_data["status"]
|
||||
total_rows = status_data["total_rows"]
|
||||
|
||||
elapsed = asyncio.get_event_loop().time() - start_wait
|
||||
logger.debug(f"📊 Статус: {status}, строк обработано: {total_rows}, прошло: {elapsed:.1f} с")
|
||||
|
||||
if status == "completed":
|
||||
logger.info(f"🎉 Задача завершена! Обработано строк: {total_rows}")
|
||||
break
|
||||
elif status == "failed":
|
||||
logger.error(f"💥 Задача завершилась с ошибкой: {status_data['error']}")
|
||||
return
|
||||
elif status in ("pending", "running"):
|
||||
await asyncio.sleep(POLL_INTERVAL)
|
||||
continue
|
||||
|
||||
download_url = BASE_URL + ENDPOINT_DOWNLOAD.format(job_id=job_id)
|
||||
|
||||
# --- Тест 1: Полная загрузка ---
|
||||
full_path = OUTPUT_DIR / f"full_export_{job_id}.jsonl.zst"
|
||||
if not await download_full(client, download_url, full_path):
|
||||
return
|
||||
|
||||
# --- Тест 2: Первые 1024 байта ---
|
||||
range_path = OUTPUT_DIR / f"range_head_{job_id}.bin"
|
||||
if not await download_range(client, download_url, range_path, start=0, end=1023):
|
||||
return
|
||||
|
||||
# --- Тест 3: Возобновление с 1024 байта ---
|
||||
resume_path = OUTPUT_DIR / f"range_resume_{job_id}.bin"
|
||||
if not await download_range(client, download_url, resume_path, start=1024):
|
||||
return
|
||||
|
||||
# --- Анализ полного архива ---
|
||||
archive_size = full_path.stat().st_size
|
||||
logger.success(f"📦 Полный архив: {sizeof_fmt(archive_size)}")
|
||||
|
||||
# --- Распаковка ---
|
||||
unpacked_path = OUTPUT_DIR / f"export_{job_id}.jsonl"
|
||||
logger.info(f"📦 Распаковка архива в: {unpacked_path.name}")
|
||||
dctx = zstd.ZstdDecompressor()
|
||||
try:
|
||||
with open(full_path, "rb") as compressed:
|
||||
with open(unpacked_path, "wb") as dest:
|
||||
dctx.copy_stream(compressed, dest)
|
||||
unpacked_size = unpacked_path.stat().st_size
|
||||
logger.success(f" Распаковано: {sizeof_fmt(unpacked_size)}")
|
||||
|
||||
# --- Подсчёт строк ---
|
||||
logger.info("🧮 Подсчёт строк в распакованном файле...")
|
||||
with open(unpacked_path, "rb") as f:
|
||||
line_count = sum(1 for _ in f)
|
||||
logger.success(f" Файл содержит {line_count:,} строк")
|
||||
|
||||
# --- Итог ---
|
||||
logger.info("📈 ИТОГИ:")
|
||||
logger.info(f" Архив: {sizeof_fmt(archive_size)}")
|
||||
logger.info(f" Распаковано: {sizeof_fmt(unpacked_size)}")
|
||||
logger.info(f" Коэффициент сжатия: {archive_size / unpacked_size:.2f}x")
|
||||
logger.info(f" Строк: {line_count:,}")
|
||||
except Exception as e:
|
||||
logger.exception(f"❌ Ошибка при распаковке: {e}")
|
||||
|
||||
logger.info("🏁 Все тесты завершены успешно!")
|
||||
|
||||
except httpx.ConnectError as e:
|
||||
logger.critical(f"❌ Не удалось подключиться к серверу. Убедитесь, что сервис запущен. {e}")
|
||||
except Exception as e:
|
||||
logger.exception(f"❌ Неожиданная ошибка: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
А от код сервиса, который отдает данные OPU -
|
||||
|
||||
а в отдающем сервисе OPU такой код -
|
||||
service\src\gmap2\models\base.py -
|
||||
"""
|
||||
Модуль базовых моделей для ORM.
|
||||
|
||||
Содержит базовый класс и миксин для динамического определения имён таблиц
|
||||
на основе конфигурации приложения.
|
||||
"""
|
||||
|
||||
from sqlalchemy.orm import declarative_base, declared_attr
|
||||
|
||||
from gmap2.config import APP_CONFIG
|
||||
|
||||
TABLE_NAME_MAPPING = {
|
||||
"opudata": APP_CONFIG.gp.opu_table,
|
||||
"lprnewsdata": APP_CONFIG.gp.lpr_news_table,
|
||||
}
|
||||
|
||||
|
||||
BASE = declarative_base()
|
||||
|
||||
|
||||
class DynamicTableMixin: # pylint: disable=too-few-public-methods
|
||||
"""
|
||||
Миксин для динамического определения имени таблицы через __tablename__.
|
||||
|
||||
Имя таблицы определяется по имени класса в нижнем регистре.
|
||||
Если имя класса присутствует в TABLE_NAME_MAPPING, используется значение из маппинга.
|
||||
В противном случае используется имя класса как имя таблицы.
|
||||
"""
|
||||
|
||||
@declared_attr
|
||||
def __tablename__(cls) -> str: # pylint: disable=no-self-argument
|
||||
"""
|
||||
Динамически возвращает имя таблицы для модели.
|
||||
|
||||
Использует маппинг из конфигурации, если имя класса известно.
|
||||
В противном случае возвращает имя класса в нижнем регистре.
|
||||
|
||||
:return: Имя таблицы, используемое SQLAlchemy при работе с моделью.
|
||||
"""
|
||||
class_name = cls.__name__.lower()
|
||||
return TABLE_NAME_MAPPING.get(class_name, class_name)
|
||||
|
||||
service\src\gmap2\models\opu.py -
|
||||
"""
|
||||
ORM-модель для таблицы OPU.
|
||||
|
||||
Замечание:
|
||||
В БД отсутствует PRIMARY KEY. Для корректной работы SQLAlchemy
|
||||
используется виртуальный составной первичный ключ (wf_row_id, wf_load_id).
|
||||
Это не влияет на экспорт, но требуется для итерации через ORM.
|
||||
"""
|
||||
|
||||
from sqlalchemy import Column, Date, DateTime, Float, Integer, String
|
||||
|
||||
from .base import BASE, DynamicTableMixin
|
||||
|
||||
|
||||
class OPUData(DynamicTableMixin, BASE): # pylint: disable=too-few-public-methods
|
||||
"""
|
||||
Данные из таблицы brief_opu.
|
||||
"""
|
||||
|
||||
__table_args__ = {"schema": "brief_opu_schema"}
|
||||
|
||||
wf_row_id = Column(Integer, primary_key=True, nullable=False)
|
||||
wf_load_id = Column(Integer, primary_key=True, nullable=False)
|
||||
|
||||
object_id = Column(String)
|
||||
object_nm = Column(String)
|
||||
desk_nm = Column(String)
|
||||
object_tp = Column(String)
|
||||
object_unit = Column(String)
|
||||
actdate = Column(Date)
|
||||
layer_cd = Column(String)
|
||||
layer_nm = Column(String)
|
||||
measure = Column(String)
|
||||
opu_cd = Column(String)
|
||||
opu_nm_sh = Column(String)
|
||||
opu_nm = Column(String)
|
||||
opu_lvl = Column(Integer)
|
||||
product_nm = Column(String)
|
||||
opu_prnt_cd = Column(String)
|
||||
opu_prnt_nm_sh = Column(String)
|
||||
opu_prnt_nm = Column(String)
|
||||
product_prnt_nm = Column(String)
|
||||
sum_amountrub_p_usd = Column(Float)
|
||||
wf_load_dttm = Column(DateTime)
|
||||
|
||||
service\src\gmap2\repositories\base.py -
|
||||
"""
|
||||
Базовый репозиторий с общим функционалом стриминга.
|
||||
"""
|
||||
|
||||
from collections.abc import AsyncGenerator, Callable
|
||||
from typing import Generic, List, Optional, TypeVar, Union
|
||||
from typing_extensions import Self
|
||||
|
||||
from loguru import logger
|
||||
from sqlalchemy import Select, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
_T_Model = TypeVar("_T_Model") # pylint: disable=invalid-name
|
||||
|
||||
|
||||
class BaseRepository(Generic[_T_Model]):
|
||||
"""
|
||||
Абстрактный репозиторий с поддержкой стриминга.
|
||||
"""
|
||||
|
||||
model: type[_T_Model]
|
||||
default_order_by: Union[List[Callable[[], List]], List] = []
|
||||
|
||||
def __init__(self, session: AsyncSession) -> None:
|
||||
"""
|
||||
Инициализирует репозиторий.
|
||||
|
||||
Args:
|
||||
session: Асинхронная сессия SQLAlchemy.
|
||||
"""
|
||||
self.session = session
|
||||
|
||||
async def stream_all(
|
||||
self,
|
||||
chunk_size: int = 10_000,
|
||||
statement: Optional[Select[tuple[_T_Model]]] = None,
|
||||
) -> AsyncGenerator[list[_T_Model], None]:
|
||||
"""
|
||||
Потоково извлекает записи из БД пачками.
|
||||
|
||||
Выполняет стриминг данных с использованием асинхронного курсора.
|
||||
Поддерживает кастомный запрос и автоматическое упорядочивание.
|
||||
|
||||
:param chunk_size: Размер пачки записей для одной итерации.
|
||||
:param statement: Опциональный SQL-запрос.
|
||||
:yield: Список экземпляров модели, загруженных порциями.
|
||||
"""
|
||||
logger.info(f"Streaming {self.model.__name__} in batches of {chunk_size}")
|
||||
|
||||
stmt = statement or select(self.model)
|
||||
|
||||
if not statement and self.default_order_by:
|
||||
stmt = stmt.order_by(*self.default_order_by)
|
||||
|
||||
try:
|
||||
result = await self.session.stream(
|
||||
stmt.execution_options(
|
||||
stream_results=True,
|
||||
max_row_count=chunk_size,
|
||||
)
|
||||
)
|
||||
|
||||
partitions_method = result.partitions
|
||||
partitions_result = partitions_method(chunk_size)
|
||||
|
||||
async for partition in partitions_result:
|
||||
items = [row[0] for row in partition]
|
||||
yield items
|
||||
|
||||
logger.info(f"Finished streaming {self.model.__name__}")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error during streaming {self.model.__name__}: {type(e).__name__}: {e}"
|
||||
)
|
||||
raise
|
||||
|
||||
@classmethod
|
||||
def create(cls, session: AsyncSession) -> Self:
|
||||
"""
|
||||
Фабричный метод.
|
||||
|
||||
Args:
|
||||
session: Асинхронная сессия.
|
||||
|
||||
Returns:
|
||||
Экземпляр репозитория.
|
||||
"""
|
||||
return cls(session=session)
|
||||
|
||||
service\src\gmap2\repositories\opu_repository.py -
|
||||
"""
|
||||
Репозиторий для OPU.
|
||||
"""
|
||||
|
||||
from gmap2.models.opu import OPUData
|
||||
|
||||
from .base import BaseRepository
|
||||
|
||||
|
||||
class OPURepository(BaseRepository[OPUData]):
|
||||
"""
|
||||
Репозиторий для OPUData.
|
||||
"""
|
||||
|
||||
model = OPUData
|
||||
default_order_by = [
|
||||
OPUData.wf_load_id,
|
||||
OPUData.wf_row_id,
|
||||
]
|
||||
|
||||
service\src\gmap2\api\v1\routes.py -
|
||||
"""
|
||||
Маршруты API для запуска, статуса и скачивания экспорта.
|
||||
"""
|
||||
|
||||
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request
|
||||
|
||||
from gmap2.services.job.background_worker import (
|
||||
run_lpr_news_export_job,
|
||||
run_opu_export_job,
|
||||
)
|
||||
from gmap2.services.job.job_manager import JobManager
|
||||
from gmap2.utils.file_utils import create_range_aware_response
|
||||
|
||||
from .schemas import ExportJobStatus, StartExportResponse
|
||||
|
||||
router = APIRouter(prefix="/export", tags=["export"])
|
||||
|
||||
|
||||
def get_job_manager(request: Request) -> JobManager:
|
||||
"""
|
||||
Извлекает JobManager из application state.
|
||||
"""
|
||||
return request.app.state.job_manager
|
||||
|
||||
|
||||
@router.post("/opu/start", response_model=StartExportResponse)
|
||||
async def start_opu_export(
|
||||
request: Request, background_tasks: BackgroundTasks
|
||||
) -> StartExportResponse:
|
||||
"""
|
||||
Запускает фоновую задачу экспорта данных OPU.
|
||||
|
||||
Returns:
|
||||
Идентификатор задачи.
|
||||
"""
|
||||
job_manager = get_job_manager(request)
|
||||
job_id = job_manager.start_job("opu")
|
||||
|
||||
background_tasks.add_task(run_opu_export_job, job_id, job_manager)
|
||||
|
||||
return StartExportResponse(job_id=job_id)
|
||||
|
||||
|
||||
@router.post("/lpr-news/start", response_model=StartExportResponse)
|
||||
async def start_lpr_news_export(
|
||||
request: Request, background_tasks: BackgroundTasks
|
||||
) -> StartExportResponse:
|
||||
"""
|
||||
Запускает фоновую задачу экспорта данных LPR News.
|
||||
|
||||
Returns:
|
||||
Идентификатор задачи.
|
||||
"""
|
||||
job_manager = get_job_manager(request)
|
||||
job_id = job_manager.start_job("lpr_news")
|
||||
|
||||
background_tasks.add_task(run_lpr_news_export_job, job_id, job_manager)
|
||||
|
||||
return StartExportResponse(job_id=job_id)
|
||||
|
||||
|
||||
@router.get("/{job_id}/status", response_model=ExportJobStatus)
|
||||
async def get_export_status(job_id: str, request: Request) -> ExportJobStatus:
|
||||
"""
|
||||
Возвращает текущий статус задачи экспорта.
|
||||
|
||||
Args:
|
||||
job_id: Идентификатор задачи.
|
||||
|
||||
Returns:
|
||||
Статус задачи.
|
||||
|
||||
Raises:
|
||||
HTTPException 404: Если задача не найдена.
|
||||
"""
|
||||
job_manager = get_job_manager(request)
|
||||
status = job_manager.get_job_status(job_id)
|
||||
if not status:
|
||||
raise HTTPException(status_code=404, detail="Job not found")
|
||||
return ExportJobStatus(**status)
|
||||
|
||||
|
||||
@router.get("/{job_id}/download")
|
||||
async def download_export(job_id: str, request: Request):
|
||||
"""
|
||||
Стримит сжатый файл экспорта клиенту с поддержкой Range-запросов.
|
||||
"""
|
||||
job_manager = get_job_manager(request)
|
||||
status = job_manager.get_job_status(job_id)
|
||||
|
||||
if not status:
|
||||
raise HTTPException(status_code=404, detail="Job not found")
|
||||
|
||||
if status["status"] != "completed":
|
||||
raise HTTPException(
|
||||
status_code=409, detail=f"Job is {status['status']}, not completed"
|
||||
)
|
||||
|
||||
file_path = status.get("temp_file_path")
|
||||
if not file_path or not file_path.endswith(".jsonl.zst"):
|
||||
raise HTTPException(status_code=500, detail="Export file not available")
|
||||
|
||||
filename = f"export_{job_id}.jsonl.zst"
|
||||
|
||||
return await create_range_aware_response(
|
||||
file_path=file_path,
|
||||
filename=filename,
|
||||
request=request,
|
||||
media_type="application/octet-stream",
|
||||
)
|
||||
|
||||
service\src\gmap2\services\export\export_service.py -
|
||||
"""
|
||||
Сервис экспорта данных: объединяет репозиторий, форматирование и сжатие.
|
||||
Формат: JSON Lines + Zstandard (.jsonl.zst), один непрерывный zstd-фрейм.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import math
|
||||
import os
|
||||
import tempfile
|
||||
import threading
|
||||
from collections.abc import AsyncGenerator, Callable
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import date, datetime
|
||||
from typing import Any, Tuple
|
||||
|
||||
import aiofiles
|
||||
import zstandard as zstd
|
||||
from loguru import logger
|
||||
from orjson import OPT_NAIVE_UTC, OPT_UTC_Z # pylint: disable=no-name-in-module
|
||||
from orjson import dumps as orjson_dumps # pylint: disable=no-name-in-module
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from gmap2.repositories import LPRNewsRepository, OPURepository
|
||||
|
||||
from ..job.job_manager import JobManager
|
||||
from .formatters import models_to_dicts
|
||||
|
||||
|
||||
class _ZstdAsyncSink:
|
||||
"""
|
||||
Потокобезопасный приёмник для zstd.stream_writer.
|
||||
|
||||
Собирает сжатые данные по чанкам в памяти с использованием блокировки.
|
||||
Предназначен для использования в асинхронном контексте,
|
||||
где сжатие выполняется в отдельном потоке.
|
||||
"""
|
||||
|
||||
__slots__ = ("_chunks", "_lock")
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""
|
||||
Инициализирует приёмник с пустым списком чанков и блокировкой.
|
||||
"""
|
||||
self._chunks: list[bytes] = []
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def write(self, b: bytes) -> int:
|
||||
"""
|
||||
Записывает байтовый фрагмент в буфер.
|
||||
|
||||
Метод потокобезопасен.
|
||||
Копирует данные и добавляет в внутренний список чанков.
|
||||
|
||||
:param b: Байтовые данные для записи.
|
||||
:return: Длина записанных данных.
|
||||
"""
|
||||
with self._lock:
|
||||
self._chunks.append(bytes(b))
|
||||
return len(b)
|
||||
|
||||
def drain(self) -> list[bytes]:
|
||||
"""
|
||||
Извлекает все накопленные чанки, сбрасывая внутренний буфер.
|
||||
|
||||
Метод потокобезопасен.
|
||||
Возвращает список всех чанков, записанных с момента последнего сброса.
|
||||
|
||||
:return: Список байтовых фрагментов.
|
||||
"""
|
||||
with self._lock:
|
||||
chunks = self._chunks
|
||||
self._chunks = []
|
||||
return chunks
|
||||
|
||||
|
||||
class ExportService:
|
||||
"""
|
||||
Основной сервис экспорта данных в формате JSON Lines + zstd.
|
||||
"""
|
||||
|
||||
def __init__(self, chunk_size: int = 10_000, zstd_level: int = 3) -> None:
|
||||
self.chunk_size = chunk_size
|
||||
self.zstd_level = zstd_level
|
||||
|
||||
@asynccontextmanager
|
||||
async def _export_to_zstd( # pylint: disable=too-many-arguments,too-many-locals
|
||||
self,
|
||||
*,
|
||||
session: AsyncSession,
|
||||
job_id: str,
|
||||
job_manager: JobManager,
|
||||
repo_factory: Callable[[AsyncSession], Any],
|
||||
label: str,
|
||||
temp_dir: str | None = None,
|
||||
) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]:
|
||||
repo = repo_factory(session)
|
||||
file_path: str | None = None
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(
|
||||
delete=False, suffix=".jsonl.zst", dir=temp_dir
|
||||
) as tmp:
|
||||
file_path = tmp.name
|
||||
|
||||
logger.info(f"[export] {label}: start -> {file_path}")
|
||||
|
||||
cctx = zstd.ZstdCompressor(level=self.zstd_level)
|
||||
sink = _ZstdAsyncSink()
|
||||
|
||||
async with aiofiles.open(file_path, "wb") as f:
|
||||
writer = cctx.stream_writer(sink)
|
||||
try:
|
||||
async for batch in repo.stream_all(chunk_size=self.chunk_size):
|
||||
if not batch:
|
||||
continue
|
||||
dicts = models_to_dicts(batch)
|
||||
payload = ("\n".join(_dumps(d) for d in dicts) + "\n").encode(
|
||||
"utf-8"
|
||||
)
|
||||
|
||||
await asyncio.to_thread(writer.write, payload)
|
||||
|
||||
for chunk in sink.drain():
|
||||
await f.write(chunk)
|
||||
|
||||
job_manager.increment_rows(job_id, len(batch))
|
||||
|
||||
await asyncio.to_thread(writer.flush, zstd.FLUSH_FRAME)
|
||||
|
||||
for chunk in sink.drain():
|
||||
await f.write(chunk)
|
||||
finally:
|
||||
await asyncio.to_thread(writer.close)
|
||||
for chunk in sink.drain():
|
||||
await f.write(chunk)
|
||||
await f.flush()
|
||||
|
||||
logger.info(f"[export] {label}: done -> {file_path}")
|
||||
yield _stream_file(file_path), file_path
|
||||
|
||||
except Exception:
|
||||
if file_path and os.path.exists(file_path):
|
||||
await asyncio.to_thread(os.remove, file_path)
|
||||
logger.exception(f"[export] {label}: failed, temporary file removed")
|
||||
raise
|
||||
|
||||
@asynccontextmanager
|
||||
async def export_opu_to_zstd(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
job_id: str,
|
||||
job_manager: JobManager,
|
||||
temp_dir: str | None = None,
|
||||
) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]:
|
||||
"""
|
||||
Экспорт OPU в один непрерывный zstd-поток.
|
||||
"""
|
||||
async with self._export_to_zstd(
|
||||
session=session,
|
||||
job_id=job_id,
|
||||
job_manager=job_manager,
|
||||
repo_factory=OPURepository,
|
||||
label="OPU",
|
||||
temp_dir=temp_dir,
|
||||
) as ctx:
|
||||
yield ctx
|
||||
|
||||
@asynccontextmanager
|
||||
async def export_lpr_news_to_zstd(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
job_id: str,
|
||||
job_manager: JobManager,
|
||||
temp_dir: str | None = None,
|
||||
) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]:
|
||||
"""
|
||||
Экспорт LPR News в один непрерывный zstd-поток.
|
||||
"""
|
||||
async with self._export_to_zstd(
|
||||
session=session,
|
||||
job_id=job_id,
|
||||
job_manager=job_manager,
|
||||
repo_factory=LPRNewsRepository,
|
||||
label="LPR News",
|
||||
temp_dir=temp_dir,
|
||||
) as ctx:
|
||||
yield ctx
|
||||
|
||||
|
||||
def _dumps(data: dict) -> str:
|
||||
"""
|
||||
Сериализует словарь в JSON-строку (orjson).
|
||||
"""
|
||||
|
||||
return orjson_dumps(
|
||||
data,
|
||||
default=_serialize_value,
|
||||
option=OPT_UTC_Z | OPT_NAIVE_UTC,
|
||||
).decode("utf-8")
|
||||
|
||||
|
||||
def _serialize_value(value: Any) -> Any:
|
||||
"""
|
||||
Преобразует значения к JSON-совместимому виду.
|
||||
"""
|
||||
if isinstance(value, (datetime, date)):
|
||||
return value.isoformat()
|
||||
if isinstance(value, float) and not math.isfinite(value):
|
||||
return None
|
||||
return value
|
||||
|
||||
|
||||
async def _stream_file(
|
||||
file_path: str, chunk_size: int = 8192
|
||||
) -> AsyncGenerator[bytes, None]:
|
||||
"""
|
||||
Асинхронно читает файл блоками.
|
||||
"""
|
||||
async with aiofiles.open(file_path, "rb") as f:
|
||||
while chunk := await f.read(chunk_size):
|
||||
yield chunk
|
||||
|
||||
service\src\gmap2\services\export\compressors.py -
|
||||
"""
|
||||
Работа со сжатием данных с использованием zstandard.
|
||||
"""
|
||||
|
||||
from typing import BinaryIO
|
||||
|
||||
import zstandard as zstd
|
||||
|
||||
|
||||
def create_zstd_writer(fileobj: BinaryIO, level: int = 3) -> zstd.ZstdCompressionWriter:
|
||||
"""
|
||||
Создаёт сжатый writer поверх бинарного файла.
|
||||
|
||||
Args:
|
||||
fileobj: Целевой файл (например, tempfile).
|
||||
level: Уровень сжатия (1–10). По умолчанию 3 - баланс скорости и размера.
|
||||
|
||||
Returns:
|
||||
Объект для записи сжатых данных.
|
||||
"""
|
||||
cctx = zstd.ZstdCompressor(level=level)
|
||||
return cctx.stream_writer(fileobj)
|
||||
|
||||
service\src\gmap2\services\export\formatters.py -
|
||||
"""
|
||||
Форматирование ORM-объектов в словари.
|
||||
С оптимизацией: кеширование структуры модели.
|
||||
"""
|
||||
|
||||
from datetime import date, datetime
|
||||
from decimal import Decimal
|
||||
from typing import Any, Dict, List
|
||||
from weakref import WeakKeyDictionary
|
||||
|
||||
from sqlalchemy import inspect
|
||||
from sqlalchemy.orm import RelationshipProperty
|
||||
|
||||
_columns_cache: WeakKeyDictionary = WeakKeyDictionary()
|
||||
|
||||
|
||||
def _get_model_columns(model_instance: Any) -> List[str]:
|
||||
"""
|
||||
Возвращает список имен колонок модели (без отношений).
|
||||
Кеширует результат.
|
||||
"""
|
||||
model_class = model_instance.__class__
|
||||
if model_class not in _columns_cache:
|
||||
mapper = inspect(model_class)
|
||||
columns = [
|
||||
attr.key
|
||||
for attr in mapper.attrs
|
||||
if not isinstance(attr, RelationshipProperty)
|
||||
]
|
||||
_columns_cache[model_class] = columns
|
||||
return _columns_cache[model_class]
|
||||
|
||||
|
||||
def _serialize_value(value: Any) -> Any:
|
||||
"""
|
||||
Сериализует значение в JSON-совместимый формат.
|
||||
"""
|
||||
if isinstance(value, (datetime, date)):
|
||||
return value.isoformat()
|
||||
if isinstance(value, Decimal):
|
||||
return float(value)
|
||||
if value is None:
|
||||
return None
|
||||
return value
|
||||
|
||||
|
||||
def models_to_dicts(instances: List[Any]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Массовое преобразование списка ORM-объектов в словари.
|
||||
|
||||
Использует кеширование структуры модели для высокой производительности.
|
||||
|
||||
Args:
|
||||
instances: Список ORM-объектов.
|
||||
|
||||
Returns:
|
||||
Список словарей.
|
||||
"""
|
||||
if not instances:
|
||||
return []
|
||||
|
||||
columns = _get_model_columns(instances[0])
|
||||
return [
|
||||
{col: _serialize_value(getattr(obj, col)) for col in columns}
|
||||
for obj in instances
|
||||
]
|
||||
|
||||
|
||||
__all__ = [
|
||||
"models_to_dicts",
|
||||
]
|
||||
|
||||
service\src\gmap2\services\job\background_worker.py -
|
||||
"""
|
||||
Фоновые задачи экспорта.
|
||||
"""
|
||||
|
||||
from psycopg import errors as pg_errors
|
||||
from loguru import logger
|
||||
from sqlalchemy.exc import DatabaseError, OperationalError
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
from gmap2.context import APP_CTX
|
||||
from gmap2.services.export.export_service import ExportService
|
||||
|
||||
from typing import Optional
|
||||
|
||||
TRANSIENT_ERRORS = (
|
||||
OperationalError,
|
||||
DatabaseError,
|
||||
ConnectionError,
|
||||
TimeoutError,
|
||||
pg_errors.ConnectionException,
|
||||
pg_errors.AdminShutdown,
|
||||
pg_errors.CannotConnectNow,
|
||||
)
|
||||
|
||||
|
||||
def _create_export_job(
|
||||
export_method_name: str,
|
||||
job_type: str,
|
||||
):
|
||||
"""
|
||||
Фабрика фоновых задач экспорта.
|
||||
|
||||
Args:
|
||||
export_method_name: Название метода экспорта в ExportService.
|
||||
job_type: Тип задачи ("opu", "lpr_news").
|
||||
|
||||
Returns:
|
||||
Асинхронная функция-задача.
|
||||
"""
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(10),
|
||||
wait=wait_exponential(multiplier=1, max=60),
|
||||
retry=retry_if_exception_type(TRANSIENT_ERRORS),
|
||||
reraise=True,
|
||||
)
|
||||
async def run_export_job(
|
||||
job_id: str, job_manager, temp_dir: Optional[str] = None
|
||||
) -> None:
|
||||
export_service = ExportService()
|
||||
try:
|
||||
async with APP_CTX.get_db_session() as session:
|
||||
job_manager.mark_running(job_id)
|
||||
|
||||
export_method = getattr(export_service, export_method_name)
|
||||
async with export_method(
|
||||
session=session,
|
||||
job_id=job_id,
|
||||
job_manager=job_manager,
|
||||
temp_dir=temp_dir,
|
||||
) as (stream, file_path):
|
||||
async for _ in stream:
|
||||
pass
|
||||
|
||||
job_manager.mark_completed(job_id, file_path)
|
||||
|
||||
except Exception as e:
|
||||
if not isinstance(e, TRANSIENT_ERRORS):
|
||||
job_manager.mark_failed(job_id, f"{type(e).__name__}: {e}")
|
||||
logger.exception(f"Export job {job_id} ({job_type}) failed")
|
||||
raise
|
||||
|
||||
run_export_job.__name__ = f"run_{job_type}_export_job"
|
||||
run_export_job.__doc__ = f"Фоновая задача экспорта {job_type.upper()} с retry."
|
||||
|
||||
return run_export_job
|
||||
|
||||
|
||||
run_opu_export_job = _create_export_job("export_opu_to_zstd", "opu")
|
||||
run_lpr_news_export_job = _create_export_job("export_lpr_news_to_zstd", "lpr_news")
|
||||
File diff suppressed because it is too large
Load Diff
526
README.md
526
README.md
|
|
@ -153,19 +153,108 @@ src/dataloader/
|
|||
|
||||
## HTTP API (v1)
|
||||
|
||||
- POST `/api/v1/jobs/trigger`
|
||||
- Вход: `{queue, task, args?, idempotency_key?, lock_key, partition_key?, priority?, available_at?, max_attempts?, lease_ttl_sec?, producer?, consumer_group?}`
|
||||
- Выход: `{job_id: UUID, status: str}`
|
||||
- Идемпотентность по `idempotency_key` (если задан).
|
||||
### POST `/api/v1/jobs/trigger`
|
||||
|
||||
- GET `/api/v1/jobs/{job_id}/status`
|
||||
- Выход: `{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress: {}}`
|
||||
Постановка задачи в очередь (идемпотентная операция).
|
||||
|
||||
- POST `/api/v1/jobs/{job_id}/cancel`
|
||||
- Выход: как в status
|
||||
- Поведение: устанавливает `cancel_requested = true`; воркер завершает между шагами пайплайна.
|
||||
**Request:**
|
||||
```json
|
||||
{
|
||||
"queue": "load.cbr", // обязательно: имя очереди
|
||||
"task": "load.cbr.rates", // обязательно: имя задачи для registry
|
||||
"args": { // опционально: аргументы задачи
|
||||
"date": "2025-01-10",
|
||||
"currencies": ["USD", "EUR"]
|
||||
},
|
||||
"idempotency_key": "cbr_2025-01-10", // опционально: ключ идемпотентности
|
||||
"lock_key": "cbr_rates", // обязательно: ключ для advisory lock
|
||||
"partition_key": "2025-01-10", // опционально: ключ партиционирования
|
||||
"priority": 100, // опционально: приоритет (меньше = выше)
|
||||
"available_at": "2025-01-10T00:00:00Z", // опционально: отложенный запуск
|
||||
"max_attempts": 3, // опционально: макс попыток (def: 5)
|
||||
"lease_ttl_sec": 300, // опционально: TTL аренды (def: 60)
|
||||
"producer": "api-client", // опционально: кто поставил
|
||||
"consumer_group": "cbr-loaders" // опционально: группа потребителей
|
||||
}
|
||||
```
|
||||
|
||||
Инфраструктурные: `/health`, `/info`.
|
||||
**Response 200:**
|
||||
```json
|
||||
{
|
||||
"job_id": "550e8400-e29b-41d4-a716-446655440000",
|
||||
"status": "queued"
|
||||
}
|
||||
```
|
||||
|
||||
**Коды ответов:**
|
||||
- `200 OK` - задача создана или уже существует (идемпотентность)
|
||||
- `400 Bad Request` - невалидные данные
|
||||
- `500 Internal Server Error` - ошибка сервера
|
||||
|
||||
### GET `/api/v1/jobs/{job_id}/status`
|
||||
|
||||
Получение статуса задачи.
|
||||
|
||||
**Response 200:**
|
||||
```json
|
||||
{
|
||||
"job_id": "550e8400-e29b-41d4-a716-446655440000",
|
||||
"status": "running", // queued/running/succeeded/failed/canceled
|
||||
"attempt": 1, // текущая попытка
|
||||
"started_at": "2025-01-10T12:00:00Z", // время первого запуска
|
||||
"finished_at": null, // время завершения (если есть)
|
||||
"heartbeat_at": "2025-01-10T12:01:30Z", // последний heartbeat
|
||||
"error": null, // текст ошибки (если есть)
|
||||
"progress": { // прогресс выполнения (custom)
|
||||
"processed": 500,
|
||||
"total": 1000
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Коды ответов:**
|
||||
- `200 OK` - статус получен
|
||||
- `404 Not Found` - задача не найдена
|
||||
|
||||
### POST `/api/v1/jobs/{job_id}/cancel`
|
||||
|
||||
Запрос кооперативной отмены задачи.
|
||||
|
||||
**Response 200:**
|
||||
```json
|
||||
{
|
||||
"job_id": "550e8400-e29b-41d4-a716-446655440000",
|
||||
"status": "running",
|
||||
"attempt": 1,
|
||||
"started_at": "2025-01-10T12:00:00Z",
|
||||
"heartbeat_at": "2025-01-10T12:01:30Z"
|
||||
}
|
||||
```
|
||||
|
||||
**Поведение:**
|
||||
- Устанавливает флаг `cancel_requested = true` в БД
|
||||
- Воркер проверяет флаг между `yield` в пайплайне
|
||||
- При обнаружении флага воркер завершает задачу со статусом `canceled`
|
||||
|
||||
**Коды ответов:**
|
||||
- `200 OK` - запрос отмены принят
|
||||
- `404 Not Found` - задача не найдена
|
||||
|
||||
### Инфраструктурные эндпоинты
|
||||
|
||||
**GET `/health`** - проверка работоспособности (без БД, < 20ms)
|
||||
```json
|
||||
{"status": "healthy"}
|
||||
```
|
||||
|
||||
**GET `/info`** - информация о сервисе
|
||||
```json
|
||||
{
|
||||
"service": "dataloader",
|
||||
"version": "1.0.0",
|
||||
"environment": "production"
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## Воркеры, пайплайны и добавление новых ETL‑задач
|
||||
|
|
@ -178,6 +267,78 @@ src/dataloader/
|
|||
5. Выполняет пайплайн (`task`) с поддержкой итеративных шагов и кооперативной отмены.
|
||||
6. По завершении: `succeeded` или `failed`/`canceled`; при ошибках возможны ретраи до `max_attempts`.
|
||||
|
||||
### Протокол выполнения задачи (SQL)
|
||||
|
||||
**Claim (захват задачи):**
|
||||
```sql
|
||||
WITH cte AS (
|
||||
SELECT job_id
|
||||
FROM dl_jobs
|
||||
WHERE status = 'queued'
|
||||
AND queue = :queue
|
||||
AND available_at <= now()
|
||||
ORDER BY priority ASC, created_at ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
LIMIT 1
|
||||
)
|
||||
UPDATE dl_jobs j
|
||||
SET status = 'running',
|
||||
started_at = COALESCE(started_at, now()),
|
||||
attempt = attempt + 1,
|
||||
lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec),
|
||||
heartbeat_at = now()
|
||||
FROM cte
|
||||
WHERE j.job_id = cte.job_id
|
||||
RETURNING j.job_id, j.task, j.args, j.lock_key, j.lease_ttl_sec;
|
||||
```
|
||||
|
||||
**Heartbeat (продление аренды):**
|
||||
```sql
|
||||
UPDATE dl_jobs
|
||||
SET heartbeat_at = now(),
|
||||
lease_expires_at = now() + make_interval(secs => :ttl)
|
||||
WHERE job_id = :job_id AND status = 'running'
|
||||
RETURNING cancel_requested;
|
||||
```
|
||||
|
||||
**Завершение успешное:**
|
||||
```sql
|
||||
UPDATE dl_jobs
|
||||
SET status = 'succeeded',
|
||||
finished_at = now(),
|
||||
lease_expires_at = NULL
|
||||
WHERE job_id = :job_id;
|
||||
|
||||
SELECT pg_advisory_unlock(hashtext(:lock_key));
|
||||
```
|
||||
|
||||
**Завершение с ошибкой (retry):**
|
||||
```sql
|
||||
UPDATE dl_jobs
|
||||
SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END,
|
||||
available_at = CASE WHEN attempt < max_attempts
|
||||
THEN now() + make_interval(secs => 30 * attempt)
|
||||
ELSE now() END,
|
||||
error = :error_message,
|
||||
lease_expires_at = NULL,
|
||||
finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END
|
||||
WHERE job_id = :job_id;
|
||||
|
||||
SELECT pg_advisory_unlock(hashtext(:lock_key));
|
||||
```
|
||||
|
||||
**Reaper (возврат потерянных задач):**
|
||||
```sql
|
||||
UPDATE dl_jobs
|
||||
SET status = 'queued',
|
||||
available_at = now(),
|
||||
lease_expires_at = NULL
|
||||
WHERE status = 'running'
|
||||
AND lease_expires_at IS NOT NULL
|
||||
AND lease_expires_at < now()
|
||||
RETURNING job_id;
|
||||
```
|
||||
|
||||
### Интерфейс пайплайна
|
||||
Пайплайн - обычная функция, возвращающая одно из:
|
||||
- асинхронный генератор шагов (рекомендуется для длинных процессов),
|
||||
|
|
@ -214,11 +375,84 @@ async def load_customers(args: dict) -> AsyncIterator[None]:
|
|||
- Для бизнес‑взаимного исключения выбирайте корректный `lock_key` (например, `customer:{id}`), чтобы параллельные задачи не конфликтовали.
|
||||
|
||||
### Добавление ETL‑задачи (шаги)
|
||||
1. Создать модуль в `workers/pipelines/` и зарегистрировать обработчик через `@register`.
|
||||
2. Убедиться, что модуль импортируется автоматически (происходит на старте через `load_all()`).
|
||||
3. При необходимости расширить схему `args` и валидацию на стороне продьюсера (кто вызывает API).
|
||||
4. При постановке задачи в `POST /api/v1/jobs/trigger` указать `task` с новым именем и желаемые `args`.
|
||||
5. При необходимости завести отдельную очередь (`queue`) и добавить её в `WORKERS_JSON` с нужной `concurrency`.
|
||||
|
||||
**1. Создать пайплайн** в `src/dataloader/workers/pipelines/`:
|
||||
|
||||
```python
|
||||
# src/dataloader/workers/pipelines/load_cbr_rates.py
|
||||
from __future__ import annotations
|
||||
from typing import AsyncIterator
|
||||
from datetime import datetime
|
||||
from dataloader.workers.pipelines.registry import register
|
||||
|
||||
@register("load.cbr.rates")
|
||||
async def load_cbr_rates(args: dict) -> AsyncIterator[None]:
|
||||
"""
|
||||
Загрузка курсов валют ЦБ РФ.
|
||||
|
||||
Args:
|
||||
args: {"date": "YYYY-MM-DD", "currencies": ["USD", "EUR"]}
|
||||
"""
|
||||
date = datetime.fromisoformat(args["date"])
|
||||
currencies = args.get("currencies", ["USD", "EUR"])
|
||||
|
||||
# Извлечение данных
|
||||
data = await fetch_cbr_rates(date, currencies)
|
||||
yield # Heartbeat checkpoint
|
||||
|
||||
# Трансформация
|
||||
transformed = transform_rates(data)
|
||||
yield # Heartbeat checkpoint
|
||||
|
||||
# Идемпотентная загрузка в БД
|
||||
async with get_target_session() as session:
|
||||
await session.execute(
|
||||
insert(CbrRates)
|
||||
.values(transformed)
|
||||
.on_conflict_do_update(
|
||||
index_elements=["date", "currency"],
|
||||
set_={"rate": excluded.c.rate, "updated_at": func.now()}
|
||||
)
|
||||
)
|
||||
await session.commit()
|
||||
yield
|
||||
```
|
||||
|
||||
**2. Настроить воркеры** в `.env`:
|
||||
|
||||
```bash
|
||||
WORKERS_JSON='[{"queue":"load.cbr","concurrency":2}]'
|
||||
```
|
||||
|
||||
**3. Поставить задачу через API**:
|
||||
|
||||
```bash
|
||||
curl -X POST http://localhost:8081/api/v1/jobs/trigger \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"queue": "load.cbr",
|
||||
"task": "load.cbr.rates",
|
||||
"args": {"date": "2025-01-10", "currencies": ["USD", "EUR"]},
|
||||
"lock_key": "cbr_rates_2025-01-10",
|
||||
"partition_key": "2025-01-10",
|
||||
"priority": 100,
|
||||
"max_attempts": 3,
|
||||
"lease_ttl_sec": 300
|
||||
}'
|
||||
```
|
||||
|
||||
**4. Мониторить выполнение**:
|
||||
|
||||
```bash
|
||||
# Получить job_id из ответа и проверить статус
|
||||
curl http://localhost:8081/api/v1/jobs/{job_id}/status
|
||||
```
|
||||
|
||||
**Ключевые моменты**:
|
||||
- Пайплайн должен быть идемпотентным (повторный запуск не должен ломать данные)
|
||||
- Используйте `yield` после каждого значимого чанка работы для heartbeat
|
||||
- `lock_key` должен обеспечивать взаимное исключение (например, `customer:{id}`)
|
||||
- `partition_key` используется для параллелизации независимых задач
|
||||
|
||||
|
||||
## Логирование, метрики, аудит
|
||||
|
|
@ -228,19 +462,257 @@ async def load_customers(args: dict) -> AsyncIterator[None]:
|
|||
|
||||
|
||||
## Тестирование
|
||||
- Юнит‑тесты: `tests/unit/*`
|
||||
- Интеграционные тесты: `tests/integration_tests/*` - покрывают API, репозиторий очереди, reaper.
|
||||
- Запуск:
|
||||
```bash
|
||||
poetry run pytest -q
|
||||
```
|
||||
|
||||
### Структура тестов
|
||||
|
||||
```
|
||||
tests/
|
||||
├── conftest.py # Глобальные фикстуры (db_engine, db_session, client)
|
||||
├── integration_tests/ # Интеграционные тесты с реальной БД
|
||||
│ ├── test_queue_repository.py # 12 тестов репозитория
|
||||
│ └── test_api_endpoints.py # 7 тестов API endpoints
|
||||
└── unit/ # Юнит-тесты с моками (92 теста)
|
||||
├── test_config.py # 30 тестов конфигурации
|
||||
├── test_context.py # 13 тестов AppContext
|
||||
├── test_api_service.py # 10 тестов сервисного слоя
|
||||
├── test_notify_listener.py # 13 тестов LISTEN/NOTIFY
|
||||
├── test_workers_base.py # 14 тестов PGWorker
|
||||
├── test_workers_manager.py # 10 тестов WorkerManager
|
||||
└── test_pipeline_registry.py # 5 тестов реестра пайплайнов
|
||||
```
|
||||
|
||||
### Запуск тестов
|
||||
|
||||
```bash
|
||||
# Все тесты (111 тестов)
|
||||
poetry run pytest
|
||||
|
||||
# Только юнит-тесты
|
||||
poetry run pytest tests/unit/ -m unit
|
||||
|
||||
# Только интеграционные
|
||||
poetry run pytest tests/integration_tests/ -m integration
|
||||
|
||||
# С покрытием кода
|
||||
poetry run pytest --cov=dataloader --cov-report=html
|
||||
|
||||
# С подробным выводом
|
||||
poetry run pytest -v -s
|
||||
```
|
||||
|
||||
### Покрытие кода
|
||||
|
||||
Текущее покрытие: **91%** (788 строк / 715 покрыто)
|
||||
|
||||
```
|
||||
Name Stmts Miss Cover
|
||||
---------------------------------------------------------------
|
||||
src/dataloader/config.py 79 0 100%
|
||||
src/dataloader/context.py 39 0 100%
|
||||
src/dataloader/api/v1/service.py 32 0 100%
|
||||
src/dataloader/storage/models/queue.py 43 0 100%
|
||||
src/dataloader/storage/schemas/queue.py 29 0 100%
|
||||
src/dataloader/storage/notify_listener.py 49 0 100%
|
||||
src/dataloader/workers/base.py 102 3 97%
|
||||
src/dataloader/workers/manager.py 64 0 100%
|
||||
src/dataloader/storage/repositories/queue 130 12 91%
|
||||
---------------------------------------------------------------
|
||||
TOTAL 788 73 91%
|
||||
```
|
||||
|
||||
### Ключевые тест-сценарии
|
||||
|
||||
**Интеграционные тесты:**
|
||||
- Постановка задачи через API → проверка статуса
|
||||
- Идемпотентность через `idempotency_key`
|
||||
- Claim задачи → heartbeat → успешное завершение
|
||||
- Claim задачи → ошибка → retry → финальный fail
|
||||
- Конкуренция воркеров через advisory lock
|
||||
- Возврат потерянных задач (reaper)
|
||||
- Отмена задачи пользователем
|
||||
|
||||
**Юнит-тесты:**
|
||||
- Конфигурация из переменных окружения
|
||||
- Создание и управление воркерами
|
||||
- LISTEN/NOTIFY механизм
|
||||
- Сервисный слой и репозиторий
|
||||
- Протокол heartbeat и отмены
|
||||
|
||||
|
||||
## Эксплуатация и масштабирование
|
||||
- Один процесс FastAPI с пулом асинхронных воркеров внутри. Масштабирование - количеством реплик (Deployment). Очередь в БД и advisory‑lock обеспечат корректность при гонках между репликами.
|
||||
- Readiness/Liveness - `/health`.
|
||||
- Корректное завершение: при SIGTERM менеджер подаёт сигнал остановки, завершает воркеры и reaper, закрывает соединения.
|
||||
### Масштабирование
|
||||
|
||||
- **Вертикальное**: Увеличение `concurrency` в `WORKERS_JSON` для существующих воркеров
|
||||
- **Горизонтальное**: Увеличение количества реплик (pods). Очередь в БД и advisory-lock обеспечат корректность при конкуренции между репликами
|
||||
- **По очередям**: Разные deployment'ы для разных очередей с разными ресурсами
|
||||
|
||||
### Graceful Shutdown
|
||||
|
||||
При получении SIGTERM:
|
||||
1. Останавливает прием новых HTTP запросов
|
||||
2. Сигнализирует воркерам о необходимости завершения
|
||||
3. Ждет завершения текущих задач (timeout 30 сек)
|
||||
4. Останавливает reaper
|
||||
5. Закрывает соединения с БД
|
||||
|
||||
### Мониторинг
|
||||
|
||||
**Health Checks:**
|
||||
- `GET /health` - проверка работоспособности (без БД, < 20ms)
|
||||
- `GET /info` - информация о версии
|
||||
|
||||
**Метрики (если включен metric_router):**
|
||||
- Количество задач по статусам (queued, running, succeeded, failed)
|
||||
- Время выполнения задач (p50, p95, p99)
|
||||
- Количество активных воркеров
|
||||
- Частота ошибок
|
||||
|
||||
**Логи:**
|
||||
Структурированные JSON-логи с уровнями: DEBUG, INFO, WARNING, ERROR, CRITICAL
|
||||
|
||||
**Ключевые события для алертов:**
|
||||
- `worker.claim.backoff` - частые backoff'ы (возможна конкуренция)
|
||||
- `worker.complete.failed` - высокий процент ошибок
|
||||
- `reaper.requeued` - частый возврат потерянных задач (проблемы с lease)
|
||||
- `api.error` - ошибки API
|
||||
|
||||
|
||||
## Лицензия
|
||||
Внутренний корпоративный проект.
|
||||
## Troubleshooting
|
||||
|
||||
### Задачи застревают в статусе `queued`
|
||||
|
||||
**Симптомы:** Задачи не начинают выполняться, остаются в `queued`.
|
||||
|
||||
**Возможные причины:**
|
||||
1. Воркеры не запущены или упали
|
||||
2. Нет воркеров для данной очереди в `WORKERS_JSON`
|
||||
3. `available_at` в будущем
|
||||
|
||||
**Решение:**
|
||||
```bash
|
||||
# Проверить логи воркеров
|
||||
docker logs dataloader | grep worker
|
||||
|
||||
# Проверить конфигурацию
|
||||
echo $WORKERS_JSON
|
||||
|
||||
# Проверить задачи в БД
|
||||
SELECT job_id, queue, status, available_at, created_at
|
||||
FROM dl_jobs
|
||||
WHERE status = 'queued'
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 10;
|
||||
```
|
||||
|
||||
### Задачи часто возвращаются в `queued` (backoff)
|
||||
|
||||
**Симптомы:** В логах частые события `worker.claim.backoff`.
|
||||
|
||||
**Причины:**
|
||||
- Конкуренция за `lock_key`: несколько задач с одинаковым `lock_key` одновременно
|
||||
- Advisory lock уже занят другим процессом
|
||||
|
||||
**Решение:**
|
||||
- Проверить корректность выбора `lock_key` (должен быть уникальным для бизнес-сущности)
|
||||
- Использовать `partition_key` для распределения нагрузки
|
||||
- Снизить `concurrency` для данной очереди
|
||||
|
||||
### Высокий процент `failed` задач
|
||||
|
||||
**Симптомы:** Много задач завершаются с `status = 'failed'`.
|
||||
|
||||
**Диагностика:**
|
||||
```sql
|
||||
SELECT job_id, task, error, attempt, max_attempts
|
||||
FROM dl_jobs
|
||||
WHERE status = 'failed'
|
||||
ORDER BY finished_at DESC
|
||||
LIMIT 20;
|
||||
```
|
||||
|
||||
**Возможные причины:**
|
||||
- Ошибки в коде пайплайна
|
||||
- Недоступность внешних сервисов
|
||||
- Таймауты (превышение `lease_ttl_sec`)
|
||||
- Неверные аргументы в `args`
|
||||
|
||||
**Решение:**
|
||||
- Проверить логи с `job_id`
|
||||
- Увеличить `max_attempts` для retry
|
||||
- Увеличить `lease_ttl_sec` для долгих операций
|
||||
- Исправить код пайплайна
|
||||
|
||||
|
||||
### Медленное выполнение задач
|
||||
|
||||
**Симптомы:** Задачи выполняются дольше ожидаемого.
|
||||
|
||||
**Диагностика:**
|
||||
```sql
|
||||
SELECT
|
||||
task,
|
||||
AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_duration_sec,
|
||||
COUNT(*) as total
|
||||
FROM dl_jobs
|
||||
WHERE status IN ('succeeded', 'failed')
|
||||
AND finished_at > NOW() - INTERVAL '1 hour'
|
||||
GROUP BY task
|
||||
ORDER BY avg_duration_sec DESC;
|
||||
```
|
||||
|
||||
**Возможные причины:**
|
||||
- Неоптимальный код пайплайна
|
||||
- Медленные внешние сервисы
|
||||
- Недостаточно воркеров (`concurrency` слишком мал)
|
||||
- Проблемы с БД (медленные запросы, блокировки)
|
||||
|
||||
**Решение:**
|
||||
- Профилировать код пайплайна
|
||||
- Увеличить `concurrency` в `WORKERS_JSON`
|
||||
- Оптимизировать запросы к БД (индексы, batching)
|
||||
- Масштабировать горизонтально (больше реплик)
|
||||
|
||||
### Утечка памяти
|
||||
|
||||
**Симптомы:** Постепенный рост потребления памяти, OOM kills.
|
||||
|
||||
**Диагностика:**
|
||||
```bash
|
||||
# Мониторинг памяти
|
||||
kubectl top pods -l app=dataloader --containers
|
||||
|
||||
# Проверить логи перед падением
|
||||
kubectl logs dataloader-xxx --previous
|
||||
```
|
||||
|
||||
**Возможные причины:**
|
||||
- Накопление объектов в памяти в пайплайне
|
||||
- Незакрытые соединения/файлы
|
||||
- Утечки в зависимостях
|
||||
|
||||
**Решение:**
|
||||
- Использовать context managers (`async with`) для ресурсов
|
||||
- Обрабатывать данные чанками, не загружать всё в память
|
||||
- Периодически перезапускать воркеры (restart policy)
|
||||
|
||||
### Проблемы с LISTEN/NOTIFY
|
||||
|
||||
**Симптомы:** Воркеры не просыпаются сразу после постановки задачи.
|
||||
|
||||
**Диагностика:**
|
||||
```bash
|
||||
# Проверить логи listener
|
||||
docker logs dataloader | grep "notify_listener\|LISTEN"
|
||||
|
||||
# Проверить триггеры в БД
|
||||
SELECT * FROM pg_trigger WHERE tgname LIKE 'dl_jobs%';
|
||||
```
|
||||
|
||||
**Возможные причины:**
|
||||
- Триггеры не созданы или отключены
|
||||
- Проблемы с подключением asyncpg
|
||||
- Воркер не подписан на канал
|
||||
|
||||
**Решение:**
|
||||
- Воркер автоматически fallback'ится на polling при проблемах с LISTEN/NOTIFY
|
||||
- Проверить DDL: триггеры `dl_jobs_notify_ins` и `dl_jobs_notify_upd`
|
||||
- Проверить права пользователя БД на LISTEN/NOTIFY
|
||||
|
|
|
|||
|
|
@ -13,12 +13,4 @@ class JobNotFoundError(HTTPException):
|
|||
)
|
||||
|
||||
|
||||
class JobAlreadyCanceledError(HTTPException):
|
||||
"""Задача уже отменена."""
|
||||
|
||||
def __init__(self, job_id: str):
|
||||
super().__init__(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"Job {job_id} is already canceled or finished"
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -6,8 +6,9 @@ from http import HTTPStatus
|
|||
from typing import Annotated
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from fastapi import APIRouter, Depends
|
||||
|
||||
from dataloader.api.v1.exceptions import JobNotFoundError
|
||||
from dataloader.api.v1.schemas import (
|
||||
JobStatusResponse,
|
||||
TriggerJobRequest,
|
||||
|
|
@ -49,7 +50,7 @@ async def get_status(
|
|||
"""
|
||||
st = await svc.status(job_id)
|
||||
if not st:
|
||||
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="job not found")
|
||||
raise JobNotFoundError(job_id=str(job_id))
|
||||
return st
|
||||
|
||||
|
||||
|
|
@ -63,5 +64,5 @@ async def cancel_job(
|
|||
"""
|
||||
st = await svc.cancel(job_id)
|
||||
if not st:
|
||||
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="job not found")
|
||||
raise JobNotFoundError(job_id=str(job_id))
|
||||
return st
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ from dataloader.storage.engine import create_engine, create_sessionmaker
|
|||
if sys.platform == "win32":
|
||||
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
@pytest_asyncio.fixture(scope="function")
|
||||
async def db_engine() -> AsyncGenerator[AsyncEngine, None]:
|
||||
|
|
|
|||
|
|
@ -0,0 +1,33 @@
|
|||
# tests/unit/test_api_router_not_found.py
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from uuid import uuid4, UUID
|
||||
|
||||
from dataloader.api.v1.router import get_status, cancel_job
|
||||
from dataloader.api.v1.exceptions import JobNotFoundError
|
||||
from dataloader.api.v1.schemas import JobStatusResponse
|
||||
|
||||
|
||||
class _FakeSvc:
|
||||
async def status(self, job_id: UUID) -> JobStatusResponse | None:
|
||||
return None
|
||||
|
||||
async def cancel(self, job_id: UUID) -> JobStatusResponse | None:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
async def test_router_get_status_raises_job_not_found():
|
||||
svc = _FakeSvc()
|
||||
with pytest.raises(JobNotFoundError):
|
||||
await get_status(uuid4(), svc=svc)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
async def test_router_cancel_raises_job_not_found():
|
||||
svc = _FakeSvc()
|
||||
with pytest.raises(JobNotFoundError):
|
||||
await cancel_job(uuid4(), svc=svc)
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
# tests/unit/test_api_router_success.py
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from uuid import uuid4, UUID
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from dataloader.api.v1.router import get_status, cancel_job
|
||||
from dataloader.api.v1.schemas import JobStatusResponse
|
||||
|
||||
|
||||
class _SvcOK:
|
||||
async def status(self, job_id: UUID) -> JobStatusResponse | None:
|
||||
return JobStatusResponse(
|
||||
job_id=job_id,
|
||||
status="queued",
|
||||
attempt=0,
|
||||
started_at=None,
|
||||
finished_at=None,
|
||||
heartbeat_at=None,
|
||||
error=None,
|
||||
progress={},
|
||||
)
|
||||
|
||||
async def cancel(self, job_id: UUID) -> JobStatusResponse | None:
|
||||
return JobStatusResponse(
|
||||
job_id=job_id,
|
||||
status="canceled",
|
||||
attempt=1,
|
||||
started_at=datetime.now(timezone.utc),
|
||||
finished_at=datetime.now(timezone.utc),
|
||||
heartbeat_at=None,
|
||||
error="by test",
|
||||
progress={},
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
async def test_router_get_status_returns_response():
|
||||
svc = _SvcOK()
|
||||
jid = uuid4()
|
||||
res = await get_status(jid, svc=svc)
|
||||
assert isinstance(res, JobStatusResponse)
|
||||
assert res.job_id == jid
|
||||
assert res.status == "queued"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
async def test_router_cancel_returns_response():
|
||||
svc = _SvcOK()
|
||||
jid = uuid4()
|
||||
res = await cancel_job(jid, svc=svc)
|
||||
assert isinstance(res, JobStatusResponse)
|
||||
assert res.job_id == jid
|
||||
assert res.status == "canceled"
|
||||
|
|
@ -1,11 +1,14 @@
|
|||
# tests/integration_tests/test_queue_repository.py
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from dataloader.storage.models import DLJob
|
||||
from dataloader.storage.repositories import QueueRepository
|
||||
from dataloader.storage.schemas import CreateJobRequest, JobStatus
|
||||
|
||||
|
|
@ -451,3 +454,232 @@ class TestQueueRepository:
|
|||
status = await repo.get_status(job_id)
|
||||
assert status is not None
|
||||
assert status.status == "queued"
|
||||
|
||||
async def test_claim_one_fails_on_advisory_lock_and_sets_backoff(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
clean_queue_tables,
|
||||
job_id: str,
|
||||
queue_name: str,
|
||||
task_name: str,
|
||||
):
|
||||
"""
|
||||
Проверка ветки отказа advisory-lock: задача возвращается в queued с отложенным available_at.
|
||||
"""
|
||||
repo = QueueRepository(db_session)
|
||||
|
||||
req = CreateJobRequest(
|
||||
job_id=job_id,
|
||||
queue=queue_name,
|
||||
task=task_name,
|
||||
args={"k": "v"},
|
||||
idempotency_key=None,
|
||||
lock_key="lock-fail-adv",
|
||||
partition_key="",
|
||||
priority=10,
|
||||
available_at=datetime.now(timezone.utc),
|
||||
max_attempts=5,
|
||||
lease_ttl_sec=30,
|
||||
producer=None,
|
||||
consumer_group=None,
|
||||
)
|
||||
await repo.create_or_get(req)
|
||||
|
||||
async def _false_lock(_: str) -> bool:
|
||||
return False
|
||||
|
||||
repo._try_advisory_lock = _false_lock # type: ignore[method-assign]
|
||||
|
||||
before = datetime.now(timezone.utc)
|
||||
claimed = await repo.claim_one(queue_name, claim_backoff_sec=15)
|
||||
after = datetime.now(timezone.utc)
|
||||
|
||||
assert claimed is None
|
||||
|
||||
st = await repo.get_status(job_id)
|
||||
assert st is not None
|
||||
assert st.status == "queued"
|
||||
|
||||
row = (await db_session.execute(select(DLJob).where(DLJob.job_id == job_id))).scalar_one()
|
||||
assert row.available_at >= before + timedelta(seconds=15)
|
||||
assert row.available_at <= after + timedelta(seconds=60)
|
||||
|
||||
async def test_heartbeat_when_not_running_returns_false(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
clean_queue_tables,
|
||||
job_id: str,
|
||||
queue_name: str,
|
||||
task_name: str,
|
||||
):
|
||||
"""
|
||||
Heartbeat для нерunning задачи возвращает (False, False).
|
||||
"""
|
||||
repo = QueueRepository(db_session)
|
||||
|
||||
req = CreateJobRequest(
|
||||
job_id=job_id,
|
||||
queue=queue_name,
|
||||
task=task_name,
|
||||
args={},
|
||||
idempotency_key=None,
|
||||
lock_key="lock-hb-not-running",
|
||||
partition_key="",
|
||||
priority=100,
|
||||
available_at=datetime.now(timezone.utc),
|
||||
max_attempts=5,
|
||||
lease_ttl_sec=60,
|
||||
producer=None,
|
||||
consumer_group=None,
|
||||
)
|
||||
await repo.create_or_get(req)
|
||||
|
||||
ok, cancel = await repo.heartbeat(job_id, ttl_sec=30)
|
||||
assert ok is False
|
||||
assert cancel is False
|
||||
|
||||
async def test_finish_fail_or_retry_marks_canceled_branch(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
clean_queue_tables,
|
||||
job_id: str,
|
||||
queue_name: str,
|
||||
task_name: str,
|
||||
):
|
||||
"""
|
||||
Ветка is_canceled=True помечает задачу как canceled и завершает её.
|
||||
"""
|
||||
repo = QueueRepository(db_session)
|
||||
|
||||
req = CreateJobRequest(
|
||||
job_id=job_id,
|
||||
queue=queue_name,
|
||||
task=task_name,
|
||||
args={},
|
||||
idempotency_key=None,
|
||||
lock_key="lock-cancel",
|
||||
partition_key="",
|
||||
priority=100,
|
||||
available_at=datetime.now(timezone.utc),
|
||||
max_attempts=5,
|
||||
lease_ttl_sec=60,
|
||||
producer=None,
|
||||
consumer_group=None,
|
||||
)
|
||||
await repo.create_or_get(req)
|
||||
await repo.claim_one(queue_name, claim_backoff_sec=5)
|
||||
|
||||
await repo.finish_fail_or_retry(job_id, err="Canceled by test", is_canceled=True)
|
||||
|
||||
st = await repo.get_status(job_id)
|
||||
assert st is not None
|
||||
assert st.status == "canceled"
|
||||
assert st.error == "Canceled by test"
|
||||
assert st.finished_at is not None
|
||||
|
||||
async def test_requeue_lost_no_expired_returns_empty(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
clean_queue_tables,
|
||||
job_id: str,
|
||||
queue_name: str,
|
||||
task_name: str,
|
||||
):
|
||||
"""
|
||||
requeue_lost без протухших задач возвращает пустой список.
|
||||
"""
|
||||
repo = QueueRepository(db_session)
|
||||
|
||||
req = CreateJobRequest(
|
||||
job_id=job_id,
|
||||
queue=queue_name,
|
||||
task=task_name,
|
||||
args={},
|
||||
idempotency_key=None,
|
||||
lock_key="lock-none-expired",
|
||||
partition_key="",
|
||||
priority=100,
|
||||
available_at=datetime.now(timezone.utc),
|
||||
max_attempts=5,
|
||||
lease_ttl_sec=120,
|
||||
producer=None,
|
||||
consumer_group=None,
|
||||
)
|
||||
await repo.create_or_get(req)
|
||||
await repo.claim_one(queue_name, claim_backoff_sec=5)
|
||||
|
||||
res = await repo.requeue_lost(now=datetime.now(timezone.utc))
|
||||
assert res == []
|
||||
|
||||
st = await repo.get_status(job_id)
|
||||
assert st is not None
|
||||
assert st.status == "running"
|
||||
|
||||
async def test_private_helpers_resolve_queue_and_advisory_unlock_are_executable(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
clean_queue_tables,
|
||||
job_id: str,
|
||||
queue_name: str,
|
||||
task_name: str,
|
||||
):
|
||||
"""
|
||||
Прямые прогоны приватных методов для покрытия редких веток.
|
||||
"""
|
||||
repo = QueueRepository(db_session)
|
||||
|
||||
rq = CreateJobRequest(
|
||||
job_id=job_id,
|
||||
queue=queue_name,
|
||||
task=task_name,
|
||||
args={},
|
||||
idempotency_key=None,
|
||||
lock_key="lock-direct-unlock",
|
||||
partition_key="",
|
||||
priority=1,
|
||||
available_at=datetime.now(timezone.utc),
|
||||
max_attempts=1,
|
||||
lease_ttl_sec=5,
|
||||
producer=None,
|
||||
consumer_group=None,
|
||||
)
|
||||
await repo.create_or_get(rq)
|
||||
|
||||
missing_uuid = str(uuid4())
|
||||
qname = await repo._resolve_queue(missing_uuid) # type: ignore[attr-defined]
|
||||
assert qname == ""
|
||||
|
||||
await repo._advisory_unlock("lock-direct-unlock") # type: ignore[attr-defined]
|
||||
|
||||
async def test_cancel_returns_false_for_nonexistent_job(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
clean_queue_tables,
|
||||
):
|
||||
"""
|
||||
Возвращает False при отмене несуществующей задачи.
|
||||
"""
|
||||
repo = QueueRepository(db_session)
|
||||
assert await repo.cancel(str(uuid4())) is False
|
||||
|
||||
async def test_finish_ok_silent_when_job_absent(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
clean_queue_tables,
|
||||
):
|
||||
"""
|
||||
Тихо завершается, если задача не найдена.
|
||||
"""
|
||||
repo = QueueRepository(db_session)
|
||||
await repo.finish_ok(str(uuid4()))
|
||||
|
||||
async def test_finish_fail_or_retry_noop_when_job_absent(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
clean_queue_tables,
|
||||
):
|
||||
"""
|
||||
Тихо выходит при отсутствии задачи.
|
||||
"""
|
||||
repo = QueueRepository(db_session)
|
||||
await repo.finish_fail_or_retry(str(uuid4()), err="no-op")
|
||||
|
|
|
|||
|
|
@ -444,3 +444,69 @@ class TestPGWorker:
|
|||
results.append(_)
|
||||
|
||||
assert len(results) == 3
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_claim_and_execute_once_handles_shutdown_cancelled_error(self):
|
||||
cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=5)
|
||||
stop_event = asyncio.Event()
|
||||
|
||||
with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \
|
||||
patch("dataloader.workers.base.QueueRepository") as mock_repo_cls:
|
||||
|
||||
mock_session = AsyncMock()
|
||||
mock_sm = MagicMock()
|
||||
mock_sm.return_value.__aenter__.return_value = mock_session
|
||||
mock_sm.return_value.__aexit__.return_value = AsyncMock(return_value=False)
|
||||
mock_ctx.get_logger.return_value = Mock()
|
||||
mock_ctx.sessionmaker = mock_sm
|
||||
|
||||
mock_repo = Mock()
|
||||
mock_repo.claim_one = AsyncMock(return_value={
|
||||
"job_id": "test-job-id",
|
||||
"lease_ttl_sec": 60,
|
||||
"task": "test.task",
|
||||
"args": {}
|
||||
})
|
||||
mock_repo.finish_fail_or_retry = AsyncMock()
|
||||
mock_repo_cls.return_value = mock_repo
|
||||
|
||||
worker = PGWorker(cfg, stop_event)
|
||||
|
||||
async def raise_cancel(*_args, **_kwargs):
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
with patch.object(worker, "_execute_with_heartbeat", new=raise_cancel):
|
||||
await worker._claim_and_execute_once()
|
||||
|
||||
mock_repo.finish_fail_or_retry.assert_called_once()
|
||||
args, kwargs = mock_repo.finish_fail_or_retry.call_args
|
||||
assert args[0] == "test-job-id"
|
||||
assert "cancelled by shutdown" in args[1]
|
||||
assert kwargs.get("is_canceled") is True
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_with_heartbeat_raises_cancelled_when_stop_set(self):
|
||||
cfg = WorkerConfig(queue="test", heartbeat_sec=1000, claim_backoff_sec=5)
|
||||
stop_event = asyncio.Event()
|
||||
stop_event.set()
|
||||
|
||||
with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \
|
||||
patch("dataloader.workers.base.QueueRepository") as mock_repo_cls:
|
||||
|
||||
mock_ctx.get_logger.return_value = Mock()
|
||||
mock_ctx.sessionmaker = Mock()
|
||||
mock_repo_cls.return_value = Mock()
|
||||
|
||||
worker = PGWorker(cfg, stop_event)
|
||||
|
||||
async def one_yield():
|
||||
yield
|
||||
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await worker._execute_with_heartbeat("job-id", 60, one_yield())
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,27 @@
|
|||
# tests/unit/test_workers_reaper.py
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, patch, Mock
|
||||
|
||||
from dataloader.workers.reaper import requeue_lost
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
async def test_requeue_lost_calls_repository_and_returns_ids():
|
||||
"""
|
||||
Проверяет, что requeue_lost вызывает QueueRepository.requeue_lost и возвращает результат.
|
||||
"""
|
||||
fake_session = Mock()
|
||||
|
||||
with patch("dataloader.workers.reaper.QueueRepository") as repo_cls:
|
||||
repo = Mock()
|
||||
repo.requeue_lost = AsyncMock(return_value=["id1", "id2"])
|
||||
repo_cls.return_value = repo
|
||||
|
||||
res = await requeue_lost(fake_session)
|
||||
|
||||
assert res == ["id1", "id2"]
|
||||
repo_cls.assert_called_once_with(fake_session)
|
||||
repo.requeue_lost.assert_awaited_once()
|
||||
Loading…
Reference in New Issue