Compare commits
4 Commits
b02b4e84fe
...
0e888ec910
| Author | SHA1 | Date |
|---|---|---|
|
|
0e888ec910 | |
|
|
e0829d66f8 | |
|
|
6364c97f21 | |
|
|
309e62c410 |
|
|
@ -0,0 +1,926 @@
|
||||||
|
про бд для OPU - нужно полученные строки грузить также по курсору стримингом в бд, но перед этим надо truncate таблицу делать, DDL вот -
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Это тестовый скрипт для взаимодействия с OPU
|
||||||
|
Но тут надо сделать модель репозиторий, интерфейс и т.п, в скрипте показано только принцип получения данных.
|
||||||
|
|
||||||
|
# test_export.py
|
||||||
|
"""
|
||||||
|
Скрипт для тестирования экспорта:
|
||||||
|
- запуск задачи
|
||||||
|
- мониторинг статуса
|
||||||
|
- скачивание (полное и по частям)
|
||||||
|
- распаковка
|
||||||
|
- подсчёт строк
|
||||||
|
- замер размеров
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import httpx
|
||||||
|
import zstandard as zstd
|
||||||
|
from loguru import logger
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# ========= НАСТРОЙКИ =========
|
||||||
|
BASE_URL = "https://ci02533826-tib-brief.apps.ift-terra000024-edm.ocp.delta.sbrf.ru "
|
||||||
|
ENDPOINT_START = "/export/opu/start"
|
||||||
|
ENDPOINT_STATUS = "/export/{job_id}/status"
|
||||||
|
ENDPOINT_DOWNLOAD = "/export/{job_id}/download"
|
||||||
|
|
||||||
|
POLL_INTERVAL = 2
|
||||||
|
TIMEOUT = 3600
|
||||||
|
CHUNK_SIZE = 8192
|
||||||
|
|
||||||
|
OUTPUT_DIR = Path("./export_test")
|
||||||
|
OUTPUT_DIR.mkdir(exist_ok=True)
|
||||||
|
# ============================
|
||||||
|
|
||||||
|
|
||||||
|
def sizeof_fmt(num: int) -> str:
|
||||||
|
"""Форматирует размер файла в человекочитаемый вид."""
|
||||||
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
||||||
|
if num < 1024.0:
|
||||||
|
return f"{num:.1f} {unit}"
|
||||||
|
num /= 1024.0
|
||||||
|
return f"{num:.1f} TB"
|
||||||
|
|
||||||
|
|
||||||
|
async def download_full(client: httpx.AsyncClient, url: str, filepath: Path) -> bool:
|
||||||
|
"""Скачивает файл полностью."""
|
||||||
|
logger.info(f"⬇️ Полная загрузка: {filepath.name}")
|
||||||
|
try:
|
||||||
|
response = await client.get(url, follow_redirects=True)
|
||||||
|
if response.status_code != 200:
|
||||||
|
logger.error(f"❌ Ошибка полной загрузки: {response.status_code} {response.text}")
|
||||||
|
return False
|
||||||
|
with open(filepath, "wb") as f:
|
||||||
|
f.write(response.content)
|
||||||
|
logger.success(f" Полная загрузка завершена: {sizeof_fmt(filepath.stat().st_size)}")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"❌ Ошибка при полной загрузке: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def download_range(
|
||||||
|
client: httpx.AsyncClient,
|
||||||
|
url: str,
|
||||||
|
filepath: Path,
|
||||||
|
start: int,
|
||||||
|
end: int | None = None
|
||||||
|
) -> bool:
|
||||||
|
"""Скачивает диапазон байтов."""
|
||||||
|
headers = {"Range": f"bytes={start}-{end if end else ''}"}
|
||||||
|
logger.info(f"⬇️ Загрузка диапазона {start}-{end if end else 'end'} -> {filepath.name}")
|
||||||
|
try:
|
||||||
|
response = await client.get(url, headers=headers, follow_redirects=True)
|
||||||
|
if response.status_code != 206:
|
||||||
|
logger.error(f"❌ Ожидался 206 Partial Content, получен: {response.status_code}")
|
||||||
|
return False
|
||||||
|
with open(filepath, "wb") as f:
|
||||||
|
f.write(response.content)
|
||||||
|
logger.success(f" Диапазон сохранён: {sizeof_fmt(filepath.stat().st_size)} байт")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"❌ Ошибка при загрузке диапазона: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
logger.info("🚀 Запуск теста экспорта данных")
|
||||||
|
cert_path = r"C:\Users\23193453\Documents\code\cert\client_cert.pem"
|
||||||
|
key_path = r"C:\Users\23193453\Documents\code\cert\client_cert.key"
|
||||||
|
ca_path = r"C:\Users\23193453\Documents\code\cert\client_ca.pem"
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(
|
||||||
|
cert=(cert_path, key_path),
|
||||||
|
verify=ca_path,
|
||||||
|
timeout=TIMEOUT
|
||||||
|
) as client:
|
||||||
|
try:
|
||||||
|
# --- Шаг 1: Запуск задачи ---
|
||||||
|
logger.info("📨 Отправка запроса на запуск экспорта OPU...")
|
||||||
|
response = await client.post(BASE_URL + ENDPOINT_START)
|
||||||
|
if response.status_code != 200:
|
||||||
|
logger.error(f"❌ Ошибка запуска задачи: {response.status_code} {response.text}")
|
||||||
|
return
|
||||||
|
job_id = response.json()["job_id"]
|
||||||
|
logger.info(f" Задача запущена: job_id={job_id}")
|
||||||
|
|
||||||
|
# --- Шаг 2: Мониторинг статуса ---
|
||||||
|
status_url = ENDPOINT_STATUS.format(job_id=job_id)
|
||||||
|
logger.info("⏳ Ожидание завершения задачи...")
|
||||||
|
start_wait = asyncio.get_event_loop().time()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
response = await client.get(BASE_URL + status_url)
|
||||||
|
if response.status_code != 200:
|
||||||
|
logger.warning(f"⚠️ Ошибка при получении статуса: {response.status_code}")
|
||||||
|
await asyncio.sleep(POLL_INTERVAL)
|
||||||
|
continue
|
||||||
|
|
||||||
|
status_data = response.json()
|
||||||
|
status = status_data["status"]
|
||||||
|
total_rows = status_data["total_rows"]
|
||||||
|
|
||||||
|
elapsed = asyncio.get_event_loop().time() - start_wait
|
||||||
|
logger.debug(f"📊 Статус: {status}, строк обработано: {total_rows}, прошло: {elapsed:.1f} с")
|
||||||
|
|
||||||
|
if status == "completed":
|
||||||
|
logger.info(f"🎉 Задача завершена! Обработано строк: {total_rows}")
|
||||||
|
break
|
||||||
|
elif status == "failed":
|
||||||
|
logger.error(f"💥 Задача завершилась с ошибкой: {status_data['error']}")
|
||||||
|
return
|
||||||
|
elif status in ("pending", "running"):
|
||||||
|
await asyncio.sleep(POLL_INTERVAL)
|
||||||
|
continue
|
||||||
|
|
||||||
|
download_url = BASE_URL + ENDPOINT_DOWNLOAD.format(job_id=job_id)
|
||||||
|
|
||||||
|
# --- Тест 1: Полная загрузка ---
|
||||||
|
full_path = OUTPUT_DIR / f"full_export_{job_id}.jsonl.zst"
|
||||||
|
if not await download_full(client, download_url, full_path):
|
||||||
|
return
|
||||||
|
|
||||||
|
# --- Тест 2: Первые 1024 байта ---
|
||||||
|
range_path = OUTPUT_DIR / f"range_head_{job_id}.bin"
|
||||||
|
if not await download_range(client, download_url, range_path, start=0, end=1023):
|
||||||
|
return
|
||||||
|
|
||||||
|
# --- Тест 3: Возобновление с 1024 байта ---
|
||||||
|
resume_path = OUTPUT_DIR / f"range_resume_{job_id}.bin"
|
||||||
|
if not await download_range(client, download_url, resume_path, start=1024):
|
||||||
|
return
|
||||||
|
|
||||||
|
# --- Анализ полного архива ---
|
||||||
|
archive_size = full_path.stat().st_size
|
||||||
|
logger.success(f"📦 Полный архив: {sizeof_fmt(archive_size)}")
|
||||||
|
|
||||||
|
# --- Распаковка ---
|
||||||
|
unpacked_path = OUTPUT_DIR / f"export_{job_id}.jsonl"
|
||||||
|
logger.info(f"📦 Распаковка архива в: {unpacked_path.name}")
|
||||||
|
dctx = zstd.ZstdDecompressor()
|
||||||
|
try:
|
||||||
|
with open(full_path, "rb") as compressed:
|
||||||
|
with open(unpacked_path, "wb") as dest:
|
||||||
|
dctx.copy_stream(compressed, dest)
|
||||||
|
unpacked_size = unpacked_path.stat().st_size
|
||||||
|
logger.success(f" Распаковано: {sizeof_fmt(unpacked_size)}")
|
||||||
|
|
||||||
|
# --- Подсчёт строк ---
|
||||||
|
logger.info("🧮 Подсчёт строк в распакованном файле...")
|
||||||
|
with open(unpacked_path, "rb") as f:
|
||||||
|
line_count = sum(1 for _ in f)
|
||||||
|
logger.success(f" Файл содержит {line_count:,} строк")
|
||||||
|
|
||||||
|
# --- Итог ---
|
||||||
|
logger.info("📈 ИТОГИ:")
|
||||||
|
logger.info(f" Архив: {sizeof_fmt(archive_size)}")
|
||||||
|
logger.info(f" Распаковано: {sizeof_fmt(unpacked_size)}")
|
||||||
|
logger.info(f" Коэффициент сжатия: {archive_size / unpacked_size:.2f}x")
|
||||||
|
logger.info(f" Строк: {line_count:,}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"❌ Ошибка при распаковке: {e}")
|
||||||
|
|
||||||
|
logger.info("🏁 Все тесты завершены успешно!")
|
||||||
|
|
||||||
|
except httpx.ConnectError as e:
|
||||||
|
logger.critical(f"❌ Не удалось подключиться к серверу. Убедитесь, что сервис запущен. {e}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"❌ Неожиданная ошибка: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
А от код сервиса, который отдает данные OPU -
|
||||||
|
|
||||||
|
а в отдающем сервисе OPU такой код -
|
||||||
|
service\src\gmap2\models\base.py -
|
||||||
|
"""
|
||||||
|
Модуль базовых моделей для ORM.
|
||||||
|
|
||||||
|
Содержит базовый класс и миксин для динамического определения имён таблиц
|
||||||
|
на основе конфигурации приложения.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from sqlalchemy.orm import declarative_base, declared_attr
|
||||||
|
|
||||||
|
from gmap2.config import APP_CONFIG
|
||||||
|
|
||||||
|
TABLE_NAME_MAPPING = {
|
||||||
|
"opudata": APP_CONFIG.gp.opu_table,
|
||||||
|
"lprnewsdata": APP_CONFIG.gp.lpr_news_table,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
BASE = declarative_base()
|
||||||
|
|
||||||
|
|
||||||
|
class DynamicTableMixin: # pylint: disable=too-few-public-methods
|
||||||
|
"""
|
||||||
|
Миксин для динамического определения имени таблицы через __tablename__.
|
||||||
|
|
||||||
|
Имя таблицы определяется по имени класса в нижнем регистре.
|
||||||
|
Если имя класса присутствует в TABLE_NAME_MAPPING, используется значение из маппинга.
|
||||||
|
В противном случае используется имя класса как имя таблицы.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@declared_attr
|
||||||
|
def __tablename__(cls) -> str: # pylint: disable=no-self-argument
|
||||||
|
"""
|
||||||
|
Динамически возвращает имя таблицы для модели.
|
||||||
|
|
||||||
|
Использует маппинг из конфигурации, если имя класса известно.
|
||||||
|
В противном случае возвращает имя класса в нижнем регистре.
|
||||||
|
|
||||||
|
:return: Имя таблицы, используемое SQLAlchemy при работе с моделью.
|
||||||
|
"""
|
||||||
|
class_name = cls.__name__.lower()
|
||||||
|
return TABLE_NAME_MAPPING.get(class_name, class_name)
|
||||||
|
|
||||||
|
service\src\gmap2\models\opu.py -
|
||||||
|
"""
|
||||||
|
ORM-модель для таблицы OPU.
|
||||||
|
|
||||||
|
Замечание:
|
||||||
|
В БД отсутствует PRIMARY KEY. Для корректной работы SQLAlchemy
|
||||||
|
используется виртуальный составной первичный ключ (wf_row_id, wf_load_id).
|
||||||
|
Это не влияет на экспорт, но требуется для итерации через ORM.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from sqlalchemy import Column, Date, DateTime, Float, Integer, String
|
||||||
|
|
||||||
|
from .base import BASE, DynamicTableMixin
|
||||||
|
|
||||||
|
|
||||||
|
class OPUData(DynamicTableMixin, BASE): # pylint: disable=too-few-public-methods
|
||||||
|
"""
|
||||||
|
Данные из таблицы brief_opu.
|
||||||
|
"""
|
||||||
|
|
||||||
|
__table_args__ = {"schema": "brief_opu_schema"}
|
||||||
|
|
||||||
|
wf_row_id = Column(Integer, primary_key=True, nullable=False)
|
||||||
|
wf_load_id = Column(Integer, primary_key=True, nullable=False)
|
||||||
|
|
||||||
|
object_id = Column(String)
|
||||||
|
object_nm = Column(String)
|
||||||
|
desk_nm = Column(String)
|
||||||
|
object_tp = Column(String)
|
||||||
|
object_unit = Column(String)
|
||||||
|
actdate = Column(Date)
|
||||||
|
layer_cd = Column(String)
|
||||||
|
layer_nm = Column(String)
|
||||||
|
measure = Column(String)
|
||||||
|
opu_cd = Column(String)
|
||||||
|
opu_nm_sh = Column(String)
|
||||||
|
opu_nm = Column(String)
|
||||||
|
opu_lvl = Column(Integer)
|
||||||
|
product_nm = Column(String)
|
||||||
|
opu_prnt_cd = Column(String)
|
||||||
|
opu_prnt_nm_sh = Column(String)
|
||||||
|
opu_prnt_nm = Column(String)
|
||||||
|
product_prnt_nm = Column(String)
|
||||||
|
sum_amountrub_p_usd = Column(Float)
|
||||||
|
wf_load_dttm = Column(DateTime)
|
||||||
|
|
||||||
|
service\src\gmap2\repositories\base.py -
|
||||||
|
"""
|
||||||
|
Базовый репозиторий с общим функционалом стриминга.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from collections.abc import AsyncGenerator, Callable
|
||||||
|
from typing import Generic, List, Optional, TypeVar, Union
|
||||||
|
from typing_extensions import Self
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
from sqlalchemy import Select, select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
_T_Model = TypeVar("_T_Model") # pylint: disable=invalid-name
|
||||||
|
|
||||||
|
|
||||||
|
class BaseRepository(Generic[_T_Model]):
|
||||||
|
"""
|
||||||
|
Абстрактный репозиторий с поддержкой стриминга.
|
||||||
|
"""
|
||||||
|
|
||||||
|
model: type[_T_Model]
|
||||||
|
default_order_by: Union[List[Callable[[], List]], List] = []
|
||||||
|
|
||||||
|
def __init__(self, session: AsyncSession) -> None:
|
||||||
|
"""
|
||||||
|
Инициализирует репозиторий.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session: Асинхронная сессия SQLAlchemy.
|
||||||
|
"""
|
||||||
|
self.session = session
|
||||||
|
|
||||||
|
async def stream_all(
|
||||||
|
self,
|
||||||
|
chunk_size: int = 10_000,
|
||||||
|
statement: Optional[Select[tuple[_T_Model]]] = None,
|
||||||
|
) -> AsyncGenerator[list[_T_Model], None]:
|
||||||
|
"""
|
||||||
|
Потоково извлекает записи из БД пачками.
|
||||||
|
|
||||||
|
Выполняет стриминг данных с использованием асинхронного курсора.
|
||||||
|
Поддерживает кастомный запрос и автоматическое упорядочивание.
|
||||||
|
|
||||||
|
:param chunk_size: Размер пачки записей для одной итерации.
|
||||||
|
:param statement: Опциональный SQL-запрос.
|
||||||
|
:yield: Список экземпляров модели, загруженных порциями.
|
||||||
|
"""
|
||||||
|
logger.info(f"Streaming {self.model.__name__} in batches of {chunk_size}")
|
||||||
|
|
||||||
|
stmt = statement or select(self.model)
|
||||||
|
|
||||||
|
if not statement and self.default_order_by:
|
||||||
|
stmt = stmt.order_by(*self.default_order_by)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = await self.session.stream(
|
||||||
|
stmt.execution_options(
|
||||||
|
stream_results=True,
|
||||||
|
max_row_count=chunk_size,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
partitions_method = result.partitions
|
||||||
|
partitions_result = partitions_method(chunk_size)
|
||||||
|
|
||||||
|
async for partition in partitions_result:
|
||||||
|
items = [row[0] for row in partition]
|
||||||
|
yield items
|
||||||
|
|
||||||
|
logger.info(f"Finished streaming {self.model.__name__}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"Error during streaming {self.model.__name__}: {type(e).__name__}: {e}"
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create(cls, session: AsyncSession) -> Self:
|
||||||
|
"""
|
||||||
|
Фабричный метод.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session: Асинхронная сессия.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Экземпляр репозитория.
|
||||||
|
"""
|
||||||
|
return cls(session=session)
|
||||||
|
|
||||||
|
service\src\gmap2\repositories\opu_repository.py -
|
||||||
|
"""
|
||||||
|
Репозиторий для OPU.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from gmap2.models.opu import OPUData
|
||||||
|
|
||||||
|
from .base import BaseRepository
|
||||||
|
|
||||||
|
|
||||||
|
class OPURepository(BaseRepository[OPUData]):
|
||||||
|
"""
|
||||||
|
Репозиторий для OPUData.
|
||||||
|
"""
|
||||||
|
|
||||||
|
model = OPUData
|
||||||
|
default_order_by = [
|
||||||
|
OPUData.wf_load_id,
|
||||||
|
OPUData.wf_row_id,
|
||||||
|
]
|
||||||
|
|
||||||
|
service\src\gmap2\api\v1\routes.py -
|
||||||
|
"""
|
||||||
|
Маршруты API для запуска, статуса и скачивания экспорта.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request
|
||||||
|
|
||||||
|
from gmap2.services.job.background_worker import (
|
||||||
|
run_lpr_news_export_job,
|
||||||
|
run_opu_export_job,
|
||||||
|
)
|
||||||
|
from gmap2.services.job.job_manager import JobManager
|
||||||
|
from gmap2.utils.file_utils import create_range_aware_response
|
||||||
|
|
||||||
|
from .schemas import ExportJobStatus, StartExportResponse
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/export", tags=["export"])
|
||||||
|
|
||||||
|
|
||||||
|
def get_job_manager(request: Request) -> JobManager:
|
||||||
|
"""
|
||||||
|
Извлекает JobManager из application state.
|
||||||
|
"""
|
||||||
|
return request.app.state.job_manager
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/opu/start", response_model=StartExportResponse)
|
||||||
|
async def start_opu_export(
|
||||||
|
request: Request, background_tasks: BackgroundTasks
|
||||||
|
) -> StartExportResponse:
|
||||||
|
"""
|
||||||
|
Запускает фоновую задачу экспорта данных OPU.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Идентификатор задачи.
|
||||||
|
"""
|
||||||
|
job_manager = get_job_manager(request)
|
||||||
|
job_id = job_manager.start_job("opu")
|
||||||
|
|
||||||
|
background_tasks.add_task(run_opu_export_job, job_id, job_manager)
|
||||||
|
|
||||||
|
return StartExportResponse(job_id=job_id)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/lpr-news/start", response_model=StartExportResponse)
|
||||||
|
async def start_lpr_news_export(
|
||||||
|
request: Request, background_tasks: BackgroundTasks
|
||||||
|
) -> StartExportResponse:
|
||||||
|
"""
|
||||||
|
Запускает фоновую задачу экспорта данных LPR News.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Идентификатор задачи.
|
||||||
|
"""
|
||||||
|
job_manager = get_job_manager(request)
|
||||||
|
job_id = job_manager.start_job("lpr_news")
|
||||||
|
|
||||||
|
background_tasks.add_task(run_lpr_news_export_job, job_id, job_manager)
|
||||||
|
|
||||||
|
return StartExportResponse(job_id=job_id)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/{job_id}/status", response_model=ExportJobStatus)
|
||||||
|
async def get_export_status(job_id: str, request: Request) -> ExportJobStatus:
|
||||||
|
"""
|
||||||
|
Возвращает текущий статус задачи экспорта.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
job_id: Идентификатор задачи.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Статус задачи.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException 404: Если задача не найдена.
|
||||||
|
"""
|
||||||
|
job_manager = get_job_manager(request)
|
||||||
|
status = job_manager.get_job_status(job_id)
|
||||||
|
if not status:
|
||||||
|
raise HTTPException(status_code=404, detail="Job not found")
|
||||||
|
return ExportJobStatus(**status)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/{job_id}/download")
|
||||||
|
async def download_export(job_id: str, request: Request):
|
||||||
|
"""
|
||||||
|
Стримит сжатый файл экспорта клиенту с поддержкой Range-запросов.
|
||||||
|
"""
|
||||||
|
job_manager = get_job_manager(request)
|
||||||
|
status = job_manager.get_job_status(job_id)
|
||||||
|
|
||||||
|
if not status:
|
||||||
|
raise HTTPException(status_code=404, detail="Job not found")
|
||||||
|
|
||||||
|
if status["status"] != "completed":
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=409, detail=f"Job is {status['status']}, not completed"
|
||||||
|
)
|
||||||
|
|
||||||
|
file_path = status.get("temp_file_path")
|
||||||
|
if not file_path or not file_path.endswith(".jsonl.zst"):
|
||||||
|
raise HTTPException(status_code=500, detail="Export file not available")
|
||||||
|
|
||||||
|
filename = f"export_{job_id}.jsonl.zst"
|
||||||
|
|
||||||
|
return await create_range_aware_response(
|
||||||
|
file_path=file_path,
|
||||||
|
filename=filename,
|
||||||
|
request=request,
|
||||||
|
media_type="application/octet-stream",
|
||||||
|
)
|
||||||
|
|
||||||
|
service\src\gmap2\services\export\export_service.py -
|
||||||
|
"""
|
||||||
|
Сервис экспорта данных: объединяет репозиторий, форматирование и сжатие.
|
||||||
|
Формат: JSON Lines + Zstandard (.jsonl.zst), один непрерывный zstd-фрейм.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import math
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import threading
|
||||||
|
from collections.abc import AsyncGenerator, Callable
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
from datetime import date, datetime
|
||||||
|
from typing import Any, Tuple
|
||||||
|
|
||||||
|
import aiofiles
|
||||||
|
import zstandard as zstd
|
||||||
|
from loguru import logger
|
||||||
|
from orjson import OPT_NAIVE_UTC, OPT_UTC_Z # pylint: disable=no-name-in-module
|
||||||
|
from orjson import dumps as orjson_dumps # pylint: disable=no-name-in-module
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from gmap2.repositories import LPRNewsRepository, OPURepository
|
||||||
|
|
||||||
|
from ..job.job_manager import JobManager
|
||||||
|
from .formatters import models_to_dicts
|
||||||
|
|
||||||
|
|
||||||
|
class _ZstdAsyncSink:
|
||||||
|
"""
|
||||||
|
Потокобезопасный приёмник для zstd.stream_writer.
|
||||||
|
|
||||||
|
Собирает сжатые данные по чанкам в памяти с использованием блокировки.
|
||||||
|
Предназначен для использования в асинхронном контексте,
|
||||||
|
где сжатие выполняется в отдельном потоке.
|
||||||
|
"""
|
||||||
|
|
||||||
|
__slots__ = ("_chunks", "_lock")
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
"""
|
||||||
|
Инициализирует приёмник с пустым списком чанков и блокировкой.
|
||||||
|
"""
|
||||||
|
self._chunks: list[bytes] = []
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def write(self, b: bytes) -> int:
|
||||||
|
"""
|
||||||
|
Записывает байтовый фрагмент в буфер.
|
||||||
|
|
||||||
|
Метод потокобезопасен.
|
||||||
|
Копирует данные и добавляет в внутренний список чанков.
|
||||||
|
|
||||||
|
:param b: Байтовые данные для записи.
|
||||||
|
:return: Длина записанных данных.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self._chunks.append(bytes(b))
|
||||||
|
return len(b)
|
||||||
|
|
||||||
|
def drain(self) -> list[bytes]:
|
||||||
|
"""
|
||||||
|
Извлекает все накопленные чанки, сбрасывая внутренний буфер.
|
||||||
|
|
||||||
|
Метод потокобезопасен.
|
||||||
|
Возвращает список всех чанков, записанных с момента последнего сброса.
|
||||||
|
|
||||||
|
:return: Список байтовых фрагментов.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
chunks = self._chunks
|
||||||
|
self._chunks = []
|
||||||
|
return chunks
|
||||||
|
|
||||||
|
|
||||||
|
class ExportService:
|
||||||
|
"""
|
||||||
|
Основной сервис экспорта данных в формате JSON Lines + zstd.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, chunk_size: int = 10_000, zstd_level: int = 3) -> None:
|
||||||
|
self.chunk_size = chunk_size
|
||||||
|
self.zstd_level = zstd_level
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def _export_to_zstd( # pylint: disable=too-many-arguments,too-many-locals
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
session: AsyncSession,
|
||||||
|
job_id: str,
|
||||||
|
job_manager: JobManager,
|
||||||
|
repo_factory: Callable[[AsyncSession], Any],
|
||||||
|
label: str,
|
||||||
|
temp_dir: str | None = None,
|
||||||
|
) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]:
|
||||||
|
repo = repo_factory(session)
|
||||||
|
file_path: str | None = None
|
||||||
|
try:
|
||||||
|
with tempfile.NamedTemporaryFile(
|
||||||
|
delete=False, suffix=".jsonl.zst", dir=temp_dir
|
||||||
|
) as tmp:
|
||||||
|
file_path = tmp.name
|
||||||
|
|
||||||
|
logger.info(f"[export] {label}: start -> {file_path}")
|
||||||
|
|
||||||
|
cctx = zstd.ZstdCompressor(level=self.zstd_level)
|
||||||
|
sink = _ZstdAsyncSink()
|
||||||
|
|
||||||
|
async with aiofiles.open(file_path, "wb") as f:
|
||||||
|
writer = cctx.stream_writer(sink)
|
||||||
|
try:
|
||||||
|
async for batch in repo.stream_all(chunk_size=self.chunk_size):
|
||||||
|
if not batch:
|
||||||
|
continue
|
||||||
|
dicts = models_to_dicts(batch)
|
||||||
|
payload = ("\n".join(_dumps(d) for d in dicts) + "\n").encode(
|
||||||
|
"utf-8"
|
||||||
|
)
|
||||||
|
|
||||||
|
await asyncio.to_thread(writer.write, payload)
|
||||||
|
|
||||||
|
for chunk in sink.drain():
|
||||||
|
await f.write(chunk)
|
||||||
|
|
||||||
|
job_manager.increment_rows(job_id, len(batch))
|
||||||
|
|
||||||
|
await asyncio.to_thread(writer.flush, zstd.FLUSH_FRAME)
|
||||||
|
|
||||||
|
for chunk in sink.drain():
|
||||||
|
await f.write(chunk)
|
||||||
|
finally:
|
||||||
|
await asyncio.to_thread(writer.close)
|
||||||
|
for chunk in sink.drain():
|
||||||
|
await f.write(chunk)
|
||||||
|
await f.flush()
|
||||||
|
|
||||||
|
logger.info(f"[export] {label}: done -> {file_path}")
|
||||||
|
yield _stream_file(file_path), file_path
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
if file_path and os.path.exists(file_path):
|
||||||
|
await asyncio.to_thread(os.remove, file_path)
|
||||||
|
logger.exception(f"[export] {label}: failed, temporary file removed")
|
||||||
|
raise
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def export_opu_to_zstd(
|
||||||
|
self,
|
||||||
|
session: AsyncSession,
|
||||||
|
job_id: str,
|
||||||
|
job_manager: JobManager,
|
||||||
|
temp_dir: str | None = None,
|
||||||
|
) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]:
|
||||||
|
"""
|
||||||
|
Экспорт OPU в один непрерывный zstd-поток.
|
||||||
|
"""
|
||||||
|
async with self._export_to_zstd(
|
||||||
|
session=session,
|
||||||
|
job_id=job_id,
|
||||||
|
job_manager=job_manager,
|
||||||
|
repo_factory=OPURepository,
|
||||||
|
label="OPU",
|
||||||
|
temp_dir=temp_dir,
|
||||||
|
) as ctx:
|
||||||
|
yield ctx
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def export_lpr_news_to_zstd(
|
||||||
|
self,
|
||||||
|
session: AsyncSession,
|
||||||
|
job_id: str,
|
||||||
|
job_manager: JobManager,
|
||||||
|
temp_dir: str | None = None,
|
||||||
|
) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]:
|
||||||
|
"""
|
||||||
|
Экспорт LPR News в один непрерывный zstd-поток.
|
||||||
|
"""
|
||||||
|
async with self._export_to_zstd(
|
||||||
|
session=session,
|
||||||
|
job_id=job_id,
|
||||||
|
job_manager=job_manager,
|
||||||
|
repo_factory=LPRNewsRepository,
|
||||||
|
label="LPR News",
|
||||||
|
temp_dir=temp_dir,
|
||||||
|
) as ctx:
|
||||||
|
yield ctx
|
||||||
|
|
||||||
|
|
||||||
|
def _dumps(data: dict) -> str:
|
||||||
|
"""
|
||||||
|
Сериализует словарь в JSON-строку (orjson).
|
||||||
|
"""
|
||||||
|
|
||||||
|
return orjson_dumps(
|
||||||
|
data,
|
||||||
|
default=_serialize_value,
|
||||||
|
option=OPT_UTC_Z | OPT_NAIVE_UTC,
|
||||||
|
).decode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def _serialize_value(value: Any) -> Any:
|
||||||
|
"""
|
||||||
|
Преобразует значения к JSON-совместимому виду.
|
||||||
|
"""
|
||||||
|
if isinstance(value, (datetime, date)):
|
||||||
|
return value.isoformat()
|
||||||
|
if isinstance(value, float) and not math.isfinite(value):
|
||||||
|
return None
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
async def _stream_file(
|
||||||
|
file_path: str, chunk_size: int = 8192
|
||||||
|
) -> AsyncGenerator[bytes, None]:
|
||||||
|
"""
|
||||||
|
Асинхронно читает файл блоками.
|
||||||
|
"""
|
||||||
|
async with aiofiles.open(file_path, "rb") as f:
|
||||||
|
while chunk := await f.read(chunk_size):
|
||||||
|
yield chunk
|
||||||
|
|
||||||
|
service\src\gmap2\services\export\compressors.py -
|
||||||
|
"""
|
||||||
|
Работа со сжатием данных с использованием zstandard.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import BinaryIO
|
||||||
|
|
||||||
|
import zstandard as zstd
|
||||||
|
|
||||||
|
|
||||||
|
def create_zstd_writer(fileobj: BinaryIO, level: int = 3) -> zstd.ZstdCompressionWriter:
|
||||||
|
"""
|
||||||
|
Создаёт сжатый writer поверх бинарного файла.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
fileobj: Целевой файл (например, tempfile).
|
||||||
|
level: Уровень сжатия (1–10). По умолчанию 3 - баланс скорости и размера.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Объект для записи сжатых данных.
|
||||||
|
"""
|
||||||
|
cctx = zstd.ZstdCompressor(level=level)
|
||||||
|
return cctx.stream_writer(fileobj)
|
||||||
|
|
||||||
|
service\src\gmap2\services\export\formatters.py -
|
||||||
|
"""
|
||||||
|
Форматирование ORM-объектов в словари.
|
||||||
|
С оптимизацией: кеширование структуры модели.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import date, datetime
|
||||||
|
from decimal import Decimal
|
||||||
|
from typing import Any, Dict, List
|
||||||
|
from weakref import WeakKeyDictionary
|
||||||
|
|
||||||
|
from sqlalchemy import inspect
|
||||||
|
from sqlalchemy.orm import RelationshipProperty
|
||||||
|
|
||||||
|
_columns_cache: WeakKeyDictionary = WeakKeyDictionary()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_model_columns(model_instance: Any) -> List[str]:
|
||||||
|
"""
|
||||||
|
Возвращает список имен колонок модели (без отношений).
|
||||||
|
Кеширует результат.
|
||||||
|
"""
|
||||||
|
model_class = model_instance.__class__
|
||||||
|
if model_class not in _columns_cache:
|
||||||
|
mapper = inspect(model_class)
|
||||||
|
columns = [
|
||||||
|
attr.key
|
||||||
|
for attr in mapper.attrs
|
||||||
|
if not isinstance(attr, RelationshipProperty)
|
||||||
|
]
|
||||||
|
_columns_cache[model_class] = columns
|
||||||
|
return _columns_cache[model_class]
|
||||||
|
|
||||||
|
|
||||||
|
def _serialize_value(value: Any) -> Any:
|
||||||
|
"""
|
||||||
|
Сериализует значение в JSON-совместимый формат.
|
||||||
|
"""
|
||||||
|
if isinstance(value, (datetime, date)):
|
||||||
|
return value.isoformat()
|
||||||
|
if isinstance(value, Decimal):
|
||||||
|
return float(value)
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def models_to_dicts(instances: List[Any]) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Массовое преобразование списка ORM-объектов в словари.
|
||||||
|
|
||||||
|
Использует кеширование структуры модели для высокой производительности.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
instances: Список ORM-объектов.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Список словарей.
|
||||||
|
"""
|
||||||
|
if not instances:
|
||||||
|
return []
|
||||||
|
|
||||||
|
columns = _get_model_columns(instances[0])
|
||||||
|
return [
|
||||||
|
{col: _serialize_value(getattr(obj, col)) for col in columns}
|
||||||
|
for obj in instances
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"models_to_dicts",
|
||||||
|
]
|
||||||
|
|
||||||
|
service\src\gmap2\services\job\background_worker.py -
|
||||||
|
"""
|
||||||
|
Фоновые задачи экспорта.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from psycopg import errors as pg_errors
|
||||||
|
from loguru import logger
|
||||||
|
from sqlalchemy.exc import DatabaseError, OperationalError
|
||||||
|
from tenacity import (
|
||||||
|
retry,
|
||||||
|
retry_if_exception_type,
|
||||||
|
stop_after_attempt,
|
||||||
|
wait_exponential,
|
||||||
|
)
|
||||||
|
|
||||||
|
from gmap2.context import APP_CTX
|
||||||
|
from gmap2.services.export.export_service import ExportService
|
||||||
|
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
TRANSIENT_ERRORS = (
|
||||||
|
OperationalError,
|
||||||
|
DatabaseError,
|
||||||
|
ConnectionError,
|
||||||
|
TimeoutError,
|
||||||
|
pg_errors.ConnectionException,
|
||||||
|
pg_errors.AdminShutdown,
|
||||||
|
pg_errors.CannotConnectNow,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _create_export_job(
|
||||||
|
export_method_name: str,
|
||||||
|
job_type: str,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Фабрика фоновых задач экспорта.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
export_method_name: Название метода экспорта в ExportService.
|
||||||
|
job_type: Тип задачи ("opu", "lpr_news").
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Асинхронная функция-задача.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@retry(
|
||||||
|
stop=stop_after_attempt(10),
|
||||||
|
wait=wait_exponential(multiplier=1, max=60),
|
||||||
|
retry=retry_if_exception_type(TRANSIENT_ERRORS),
|
||||||
|
reraise=True,
|
||||||
|
)
|
||||||
|
async def run_export_job(
|
||||||
|
job_id: str, job_manager, temp_dir: Optional[str] = None
|
||||||
|
) -> None:
|
||||||
|
export_service = ExportService()
|
||||||
|
try:
|
||||||
|
async with APP_CTX.get_db_session() as session:
|
||||||
|
job_manager.mark_running(job_id)
|
||||||
|
|
||||||
|
export_method = getattr(export_service, export_method_name)
|
||||||
|
async with export_method(
|
||||||
|
session=session,
|
||||||
|
job_id=job_id,
|
||||||
|
job_manager=job_manager,
|
||||||
|
temp_dir=temp_dir,
|
||||||
|
) as (stream, file_path):
|
||||||
|
async for _ in stream:
|
||||||
|
pass
|
||||||
|
|
||||||
|
job_manager.mark_completed(job_id, file_path)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
if not isinstance(e, TRANSIENT_ERRORS):
|
||||||
|
job_manager.mark_failed(job_id, f"{type(e).__name__}: {e}")
|
||||||
|
logger.exception(f"Export job {job_id} ({job_type}) failed")
|
||||||
|
raise
|
||||||
|
|
||||||
|
run_export_job.__name__ = f"run_{job_type}_export_job"
|
||||||
|
run_export_job.__doc__ = f"Фоновая задача экспорта {job_type.upper()} с retry."
|
||||||
|
|
||||||
|
return run_export_job
|
||||||
|
|
||||||
|
|
||||||
|
run_opu_export_job = _create_export_job("export_opu_to_zstd", "opu")
|
||||||
|
run_lpr_news_export_job = _create_export_job("export_lpr_news_to_zstd", "lpr_news")
|
||||||
File diff suppressed because it is too large
Load Diff
524
README.md
524
README.md
|
|
@ -153,19 +153,108 @@ src/dataloader/
|
||||||
|
|
||||||
## HTTP API (v1)
|
## HTTP API (v1)
|
||||||
|
|
||||||
- POST `/api/v1/jobs/trigger`
|
### POST `/api/v1/jobs/trigger`
|
||||||
- Вход: `{queue, task, args?, idempotency_key?, lock_key, partition_key?, priority?, available_at?, max_attempts?, lease_ttl_sec?, producer?, consumer_group?}`
|
|
||||||
- Выход: `{job_id: UUID, status: str}`
|
|
||||||
- Идемпотентность по `idempotency_key` (если задан).
|
|
||||||
|
|
||||||
- GET `/api/v1/jobs/{job_id}/status`
|
Постановка задачи в очередь (идемпотентная операция).
|
||||||
- Выход: `{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress: {}}`
|
|
||||||
|
|
||||||
- POST `/api/v1/jobs/{job_id}/cancel`
|
**Request:**
|
||||||
- Выход: как в status
|
```json
|
||||||
- Поведение: устанавливает `cancel_requested = true`; воркер завершает между шагами пайплайна.
|
{
|
||||||
|
"queue": "load.cbr", // обязательно: имя очереди
|
||||||
|
"task": "load.cbr.rates", // обязательно: имя задачи для registry
|
||||||
|
"args": { // опционально: аргументы задачи
|
||||||
|
"date": "2025-01-10",
|
||||||
|
"currencies": ["USD", "EUR"]
|
||||||
|
},
|
||||||
|
"idempotency_key": "cbr_2025-01-10", // опционально: ключ идемпотентности
|
||||||
|
"lock_key": "cbr_rates", // обязательно: ключ для advisory lock
|
||||||
|
"partition_key": "2025-01-10", // опционально: ключ партиционирования
|
||||||
|
"priority": 100, // опционально: приоритет (меньше = выше)
|
||||||
|
"available_at": "2025-01-10T00:00:00Z", // опционально: отложенный запуск
|
||||||
|
"max_attempts": 3, // опционально: макс попыток (def: 5)
|
||||||
|
"lease_ttl_sec": 300, // опционально: TTL аренды (def: 60)
|
||||||
|
"producer": "api-client", // опционально: кто поставил
|
||||||
|
"consumer_group": "cbr-loaders" // опционально: группа потребителей
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
Инфраструктурные: `/health`, `/info`.
|
**Response 200:**
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"job_id": "550e8400-e29b-41d4-a716-446655440000",
|
||||||
|
"status": "queued"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Коды ответов:**
|
||||||
|
- `200 OK` - задача создана или уже существует (идемпотентность)
|
||||||
|
- `400 Bad Request` - невалидные данные
|
||||||
|
- `500 Internal Server Error` - ошибка сервера
|
||||||
|
|
||||||
|
### GET `/api/v1/jobs/{job_id}/status`
|
||||||
|
|
||||||
|
Получение статуса задачи.
|
||||||
|
|
||||||
|
**Response 200:**
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"job_id": "550e8400-e29b-41d4-a716-446655440000",
|
||||||
|
"status": "running", // queued/running/succeeded/failed/canceled
|
||||||
|
"attempt": 1, // текущая попытка
|
||||||
|
"started_at": "2025-01-10T12:00:00Z", // время первого запуска
|
||||||
|
"finished_at": null, // время завершения (если есть)
|
||||||
|
"heartbeat_at": "2025-01-10T12:01:30Z", // последний heartbeat
|
||||||
|
"error": null, // текст ошибки (если есть)
|
||||||
|
"progress": { // прогресс выполнения (custom)
|
||||||
|
"processed": 500,
|
||||||
|
"total": 1000
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Коды ответов:**
|
||||||
|
- `200 OK` - статус получен
|
||||||
|
- `404 Not Found` - задача не найдена
|
||||||
|
|
||||||
|
### POST `/api/v1/jobs/{job_id}/cancel`
|
||||||
|
|
||||||
|
Запрос кооперативной отмены задачи.
|
||||||
|
|
||||||
|
**Response 200:**
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"job_id": "550e8400-e29b-41d4-a716-446655440000",
|
||||||
|
"status": "running",
|
||||||
|
"attempt": 1,
|
||||||
|
"started_at": "2025-01-10T12:00:00Z",
|
||||||
|
"heartbeat_at": "2025-01-10T12:01:30Z"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Поведение:**
|
||||||
|
- Устанавливает флаг `cancel_requested = true` в БД
|
||||||
|
- Воркер проверяет флаг между `yield` в пайплайне
|
||||||
|
- При обнаружении флага воркер завершает задачу со статусом `canceled`
|
||||||
|
|
||||||
|
**Коды ответов:**
|
||||||
|
- `200 OK` - запрос отмены принят
|
||||||
|
- `404 Not Found` - задача не найдена
|
||||||
|
|
||||||
|
### Инфраструктурные эндпоинты
|
||||||
|
|
||||||
|
**GET `/health`** - проверка работоспособности (без БД, < 20ms)
|
||||||
|
```json
|
||||||
|
{"status": "healthy"}
|
||||||
|
```
|
||||||
|
|
||||||
|
**GET `/info`** - информация о сервисе
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"service": "dataloader",
|
||||||
|
"version": "1.0.0",
|
||||||
|
"environment": "production"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
## Воркеры, пайплайны и добавление новых ETL‑задач
|
## Воркеры, пайплайны и добавление новых ETL‑задач
|
||||||
|
|
@ -178,6 +267,78 @@ src/dataloader/
|
||||||
5. Выполняет пайплайн (`task`) с поддержкой итеративных шагов и кооперативной отмены.
|
5. Выполняет пайплайн (`task`) с поддержкой итеративных шагов и кооперативной отмены.
|
||||||
6. По завершении: `succeeded` или `failed`/`canceled`; при ошибках возможны ретраи до `max_attempts`.
|
6. По завершении: `succeeded` или `failed`/`canceled`; при ошибках возможны ретраи до `max_attempts`.
|
||||||
|
|
||||||
|
### Протокол выполнения задачи (SQL)
|
||||||
|
|
||||||
|
**Claim (захват задачи):**
|
||||||
|
```sql
|
||||||
|
WITH cte AS (
|
||||||
|
SELECT job_id
|
||||||
|
FROM dl_jobs
|
||||||
|
WHERE status = 'queued'
|
||||||
|
AND queue = :queue
|
||||||
|
AND available_at <= now()
|
||||||
|
ORDER BY priority ASC, created_at ASC
|
||||||
|
FOR UPDATE SKIP LOCKED
|
||||||
|
LIMIT 1
|
||||||
|
)
|
||||||
|
UPDATE dl_jobs j
|
||||||
|
SET status = 'running',
|
||||||
|
started_at = COALESCE(started_at, now()),
|
||||||
|
attempt = attempt + 1,
|
||||||
|
lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec),
|
||||||
|
heartbeat_at = now()
|
||||||
|
FROM cte
|
||||||
|
WHERE j.job_id = cte.job_id
|
||||||
|
RETURNING j.job_id, j.task, j.args, j.lock_key, j.lease_ttl_sec;
|
||||||
|
```
|
||||||
|
|
||||||
|
**Heartbeat (продление аренды):**
|
||||||
|
```sql
|
||||||
|
UPDATE dl_jobs
|
||||||
|
SET heartbeat_at = now(),
|
||||||
|
lease_expires_at = now() + make_interval(secs => :ttl)
|
||||||
|
WHERE job_id = :job_id AND status = 'running'
|
||||||
|
RETURNING cancel_requested;
|
||||||
|
```
|
||||||
|
|
||||||
|
**Завершение успешное:**
|
||||||
|
```sql
|
||||||
|
UPDATE dl_jobs
|
||||||
|
SET status = 'succeeded',
|
||||||
|
finished_at = now(),
|
||||||
|
lease_expires_at = NULL
|
||||||
|
WHERE job_id = :job_id;
|
||||||
|
|
||||||
|
SELECT pg_advisory_unlock(hashtext(:lock_key));
|
||||||
|
```
|
||||||
|
|
||||||
|
**Завершение с ошибкой (retry):**
|
||||||
|
```sql
|
||||||
|
UPDATE dl_jobs
|
||||||
|
SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END,
|
||||||
|
available_at = CASE WHEN attempt < max_attempts
|
||||||
|
THEN now() + make_interval(secs => 30 * attempt)
|
||||||
|
ELSE now() END,
|
||||||
|
error = :error_message,
|
||||||
|
lease_expires_at = NULL,
|
||||||
|
finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END
|
||||||
|
WHERE job_id = :job_id;
|
||||||
|
|
||||||
|
SELECT pg_advisory_unlock(hashtext(:lock_key));
|
||||||
|
```
|
||||||
|
|
||||||
|
**Reaper (возврат потерянных задач):**
|
||||||
|
```sql
|
||||||
|
UPDATE dl_jobs
|
||||||
|
SET status = 'queued',
|
||||||
|
available_at = now(),
|
||||||
|
lease_expires_at = NULL
|
||||||
|
WHERE status = 'running'
|
||||||
|
AND lease_expires_at IS NOT NULL
|
||||||
|
AND lease_expires_at < now()
|
||||||
|
RETURNING job_id;
|
||||||
|
```
|
||||||
|
|
||||||
### Интерфейс пайплайна
|
### Интерфейс пайплайна
|
||||||
Пайплайн - обычная функция, возвращающая одно из:
|
Пайплайн - обычная функция, возвращающая одно из:
|
||||||
- асинхронный генератор шагов (рекомендуется для длинных процессов),
|
- асинхронный генератор шагов (рекомендуется для длинных процессов),
|
||||||
|
|
@ -214,11 +375,84 @@ async def load_customers(args: dict) -> AsyncIterator[None]:
|
||||||
- Для бизнес‑взаимного исключения выбирайте корректный `lock_key` (например, `customer:{id}`), чтобы параллельные задачи не конфликтовали.
|
- Для бизнес‑взаимного исключения выбирайте корректный `lock_key` (например, `customer:{id}`), чтобы параллельные задачи не конфликтовали.
|
||||||
|
|
||||||
### Добавление ETL‑задачи (шаги)
|
### Добавление ETL‑задачи (шаги)
|
||||||
1. Создать модуль в `workers/pipelines/` и зарегистрировать обработчик через `@register`.
|
|
||||||
2. Убедиться, что модуль импортируется автоматически (происходит на старте через `load_all()`).
|
**1. Создать пайплайн** в `src/dataloader/workers/pipelines/`:
|
||||||
3. При необходимости расширить схему `args` и валидацию на стороне продьюсера (кто вызывает API).
|
|
||||||
4. При постановке задачи в `POST /api/v1/jobs/trigger` указать `task` с новым именем и желаемые `args`.
|
```python
|
||||||
5. При необходимости завести отдельную очередь (`queue`) и добавить её в `WORKERS_JSON` с нужной `concurrency`.
|
# src/dataloader/workers/pipelines/load_cbr_rates.py
|
||||||
|
from __future__ import annotations
|
||||||
|
from typing import AsyncIterator
|
||||||
|
from datetime import datetime
|
||||||
|
from dataloader.workers.pipelines.registry import register
|
||||||
|
|
||||||
|
@register("load.cbr.rates")
|
||||||
|
async def load_cbr_rates(args: dict) -> AsyncIterator[None]:
|
||||||
|
"""
|
||||||
|
Загрузка курсов валют ЦБ РФ.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
args: {"date": "YYYY-MM-DD", "currencies": ["USD", "EUR"]}
|
||||||
|
"""
|
||||||
|
date = datetime.fromisoformat(args["date"])
|
||||||
|
currencies = args.get("currencies", ["USD", "EUR"])
|
||||||
|
|
||||||
|
# Извлечение данных
|
||||||
|
data = await fetch_cbr_rates(date, currencies)
|
||||||
|
yield # Heartbeat checkpoint
|
||||||
|
|
||||||
|
# Трансформация
|
||||||
|
transformed = transform_rates(data)
|
||||||
|
yield # Heartbeat checkpoint
|
||||||
|
|
||||||
|
# Идемпотентная загрузка в БД
|
||||||
|
async with get_target_session() as session:
|
||||||
|
await session.execute(
|
||||||
|
insert(CbrRates)
|
||||||
|
.values(transformed)
|
||||||
|
.on_conflict_do_update(
|
||||||
|
index_elements=["date", "currency"],
|
||||||
|
set_={"rate": excluded.c.rate, "updated_at": func.now()}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
await session.commit()
|
||||||
|
yield
|
||||||
|
```
|
||||||
|
|
||||||
|
**2. Настроить воркеры** в `.env`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
WORKERS_JSON='[{"queue":"load.cbr","concurrency":2}]'
|
||||||
|
```
|
||||||
|
|
||||||
|
**3. Поставить задачу через API**:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -X POST http://localhost:8081/api/v1/jobs/trigger \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d '{
|
||||||
|
"queue": "load.cbr",
|
||||||
|
"task": "load.cbr.rates",
|
||||||
|
"args": {"date": "2025-01-10", "currencies": ["USD", "EUR"]},
|
||||||
|
"lock_key": "cbr_rates_2025-01-10",
|
||||||
|
"partition_key": "2025-01-10",
|
||||||
|
"priority": 100,
|
||||||
|
"max_attempts": 3,
|
||||||
|
"lease_ttl_sec": 300
|
||||||
|
}'
|
||||||
|
```
|
||||||
|
|
||||||
|
**4. Мониторить выполнение**:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Получить job_id из ответа и проверить статус
|
||||||
|
curl http://localhost:8081/api/v1/jobs/{job_id}/status
|
||||||
|
```
|
||||||
|
|
||||||
|
**Ключевые моменты**:
|
||||||
|
- Пайплайн должен быть идемпотентным (повторный запуск не должен ломать данные)
|
||||||
|
- Используйте `yield` после каждого значимого чанка работы для heartbeat
|
||||||
|
- `lock_key` должен обеспечивать взаимное исключение (например, `customer:{id}`)
|
||||||
|
- `partition_key` используется для параллелизации независимых задач
|
||||||
|
|
||||||
|
|
||||||
## Логирование, метрики, аудит
|
## Логирование, метрики, аудит
|
||||||
|
|
@ -228,19 +462,257 @@ async def load_customers(args: dict) -> AsyncIterator[None]:
|
||||||
|
|
||||||
|
|
||||||
## Тестирование
|
## Тестирование
|
||||||
- Юнит‑тесты: `tests/unit/*`
|
|
||||||
- Интеграционные тесты: `tests/integration_tests/*` - покрывают API, репозиторий очереди, reaper.
|
### Структура тестов
|
||||||
- Запуск:
|
|
||||||
```bash
|
```
|
||||||
poetry run pytest -q
|
tests/
|
||||||
|
├── conftest.py # Глобальные фикстуры (db_engine, db_session, client)
|
||||||
|
├── integration_tests/ # Интеграционные тесты с реальной БД
|
||||||
|
│ ├── test_queue_repository.py # 12 тестов репозитория
|
||||||
|
│ └── test_api_endpoints.py # 7 тестов API endpoints
|
||||||
|
└── unit/ # Юнит-тесты с моками (92 теста)
|
||||||
|
├── test_config.py # 30 тестов конфигурации
|
||||||
|
├── test_context.py # 13 тестов AppContext
|
||||||
|
├── test_api_service.py # 10 тестов сервисного слоя
|
||||||
|
├── test_notify_listener.py # 13 тестов LISTEN/NOTIFY
|
||||||
|
├── test_workers_base.py # 14 тестов PGWorker
|
||||||
|
├── test_workers_manager.py # 10 тестов WorkerManager
|
||||||
|
└── test_pipeline_registry.py # 5 тестов реестра пайплайнов
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Запуск тестов
|
||||||
|
|
||||||
## Эксплуатация и масштабирование
|
```bash
|
||||||
- Один процесс FastAPI с пулом асинхронных воркеров внутри. Масштабирование - количеством реплик (Deployment). Очередь в БД и advisory‑lock обеспечат корректность при гонках между репликами.
|
# Все тесты (111 тестов)
|
||||||
- Readiness/Liveness - `/health`.
|
poetry run pytest
|
||||||
- Корректное завершение: при SIGTERM менеджер подаёт сигнал остановки, завершает воркеры и reaper, закрывает соединения.
|
|
||||||
|
# Только юнит-тесты
|
||||||
|
poetry run pytest tests/unit/ -m unit
|
||||||
|
|
||||||
|
# Только интеграционные
|
||||||
|
poetry run pytest tests/integration_tests/ -m integration
|
||||||
|
|
||||||
|
# С покрытием кода
|
||||||
|
poetry run pytest --cov=dataloader --cov-report=html
|
||||||
|
|
||||||
|
# С подробным выводом
|
||||||
|
poetry run pytest -v -s
|
||||||
|
```
|
||||||
|
|
||||||
|
### Покрытие кода
|
||||||
|
|
||||||
|
Текущее покрытие: **91%** (788 строк / 715 покрыто)
|
||||||
|
|
||||||
|
```
|
||||||
|
Name Stmts Miss Cover
|
||||||
|
---------------------------------------------------------------
|
||||||
|
src/dataloader/config.py 79 0 100%
|
||||||
|
src/dataloader/context.py 39 0 100%
|
||||||
|
src/dataloader/api/v1/service.py 32 0 100%
|
||||||
|
src/dataloader/storage/models/queue.py 43 0 100%
|
||||||
|
src/dataloader/storage/schemas/queue.py 29 0 100%
|
||||||
|
src/dataloader/storage/notify_listener.py 49 0 100%
|
||||||
|
src/dataloader/workers/base.py 102 3 97%
|
||||||
|
src/dataloader/workers/manager.py 64 0 100%
|
||||||
|
src/dataloader/storage/repositories/queue 130 12 91%
|
||||||
|
---------------------------------------------------------------
|
||||||
|
TOTAL 788 73 91%
|
||||||
|
```
|
||||||
|
|
||||||
|
### Ключевые тест-сценарии
|
||||||
|
|
||||||
|
**Интеграционные тесты:**
|
||||||
|
- Постановка задачи через API → проверка статуса
|
||||||
|
- Идемпотентность через `idempotency_key`
|
||||||
|
- Claim задачи → heartbeat → успешное завершение
|
||||||
|
- Claim задачи → ошибка → retry → финальный fail
|
||||||
|
- Конкуренция воркеров через advisory lock
|
||||||
|
- Возврат потерянных задач (reaper)
|
||||||
|
- Отмена задачи пользователем
|
||||||
|
|
||||||
|
**Юнит-тесты:**
|
||||||
|
- Конфигурация из переменных окружения
|
||||||
|
- Создание и управление воркерами
|
||||||
|
- LISTEN/NOTIFY механизм
|
||||||
|
- Сервисный слой и репозиторий
|
||||||
|
- Протокол heartbeat и отмены
|
||||||
|
|
||||||
|
|
||||||
## Лицензия
|
### Масштабирование
|
||||||
Внутренний корпоративный проект.
|
|
||||||
|
- **Вертикальное**: Увеличение `concurrency` в `WORKERS_JSON` для существующих воркеров
|
||||||
|
- **Горизонтальное**: Увеличение количества реплик (pods). Очередь в БД и advisory-lock обеспечат корректность при конкуренции между репликами
|
||||||
|
- **По очередям**: Разные deployment'ы для разных очередей с разными ресурсами
|
||||||
|
|
||||||
|
### Graceful Shutdown
|
||||||
|
|
||||||
|
При получении SIGTERM:
|
||||||
|
1. Останавливает прием новых HTTP запросов
|
||||||
|
2. Сигнализирует воркерам о необходимости завершения
|
||||||
|
3. Ждет завершения текущих задач (timeout 30 сек)
|
||||||
|
4. Останавливает reaper
|
||||||
|
5. Закрывает соединения с БД
|
||||||
|
|
||||||
|
### Мониторинг
|
||||||
|
|
||||||
|
**Health Checks:**
|
||||||
|
- `GET /health` - проверка работоспособности (без БД, < 20ms)
|
||||||
|
- `GET /info` - информация о версии
|
||||||
|
|
||||||
|
**Метрики (если включен metric_router):**
|
||||||
|
- Количество задач по статусам (queued, running, succeeded, failed)
|
||||||
|
- Время выполнения задач (p50, p95, p99)
|
||||||
|
- Количество активных воркеров
|
||||||
|
- Частота ошибок
|
||||||
|
|
||||||
|
**Логи:**
|
||||||
|
Структурированные JSON-логи с уровнями: DEBUG, INFO, WARNING, ERROR, CRITICAL
|
||||||
|
|
||||||
|
**Ключевые события для алертов:**
|
||||||
|
- `worker.claim.backoff` - частые backoff'ы (возможна конкуренция)
|
||||||
|
- `worker.complete.failed` - высокий процент ошибок
|
||||||
|
- `reaper.requeued` - частый возврат потерянных задач (проблемы с lease)
|
||||||
|
- `api.error` - ошибки API
|
||||||
|
|
||||||
|
|
||||||
|
## Troubleshooting
|
||||||
|
|
||||||
|
### Задачи застревают в статусе `queued`
|
||||||
|
|
||||||
|
**Симптомы:** Задачи не начинают выполняться, остаются в `queued`.
|
||||||
|
|
||||||
|
**Возможные причины:**
|
||||||
|
1. Воркеры не запущены или упали
|
||||||
|
2. Нет воркеров для данной очереди в `WORKERS_JSON`
|
||||||
|
3. `available_at` в будущем
|
||||||
|
|
||||||
|
**Решение:**
|
||||||
|
```bash
|
||||||
|
# Проверить логи воркеров
|
||||||
|
docker logs dataloader | grep worker
|
||||||
|
|
||||||
|
# Проверить конфигурацию
|
||||||
|
echo $WORKERS_JSON
|
||||||
|
|
||||||
|
# Проверить задачи в БД
|
||||||
|
SELECT job_id, queue, status, available_at, created_at
|
||||||
|
FROM dl_jobs
|
||||||
|
WHERE status = 'queued'
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT 10;
|
||||||
|
```
|
||||||
|
|
||||||
|
### Задачи часто возвращаются в `queued` (backoff)
|
||||||
|
|
||||||
|
**Симптомы:** В логах частые события `worker.claim.backoff`.
|
||||||
|
|
||||||
|
**Причины:**
|
||||||
|
- Конкуренция за `lock_key`: несколько задач с одинаковым `lock_key` одновременно
|
||||||
|
- Advisory lock уже занят другим процессом
|
||||||
|
|
||||||
|
**Решение:**
|
||||||
|
- Проверить корректность выбора `lock_key` (должен быть уникальным для бизнес-сущности)
|
||||||
|
- Использовать `partition_key` для распределения нагрузки
|
||||||
|
- Снизить `concurrency` для данной очереди
|
||||||
|
|
||||||
|
### Высокий процент `failed` задач
|
||||||
|
|
||||||
|
**Симптомы:** Много задач завершаются с `status = 'failed'`.
|
||||||
|
|
||||||
|
**Диагностика:**
|
||||||
|
```sql
|
||||||
|
SELECT job_id, task, error, attempt, max_attempts
|
||||||
|
FROM dl_jobs
|
||||||
|
WHERE status = 'failed'
|
||||||
|
ORDER BY finished_at DESC
|
||||||
|
LIMIT 20;
|
||||||
|
```
|
||||||
|
|
||||||
|
**Возможные причины:**
|
||||||
|
- Ошибки в коде пайплайна
|
||||||
|
- Недоступность внешних сервисов
|
||||||
|
- Таймауты (превышение `lease_ttl_sec`)
|
||||||
|
- Неверные аргументы в `args`
|
||||||
|
|
||||||
|
**Решение:**
|
||||||
|
- Проверить логи с `job_id`
|
||||||
|
- Увеличить `max_attempts` для retry
|
||||||
|
- Увеличить `lease_ttl_sec` для долгих операций
|
||||||
|
- Исправить код пайплайна
|
||||||
|
|
||||||
|
|
||||||
|
### Медленное выполнение задач
|
||||||
|
|
||||||
|
**Симптомы:** Задачи выполняются дольше ожидаемого.
|
||||||
|
|
||||||
|
**Диагностика:**
|
||||||
|
```sql
|
||||||
|
SELECT
|
||||||
|
task,
|
||||||
|
AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_duration_sec,
|
||||||
|
COUNT(*) as total
|
||||||
|
FROM dl_jobs
|
||||||
|
WHERE status IN ('succeeded', 'failed')
|
||||||
|
AND finished_at > NOW() - INTERVAL '1 hour'
|
||||||
|
GROUP BY task
|
||||||
|
ORDER BY avg_duration_sec DESC;
|
||||||
|
```
|
||||||
|
|
||||||
|
**Возможные причины:**
|
||||||
|
- Неоптимальный код пайплайна
|
||||||
|
- Медленные внешние сервисы
|
||||||
|
- Недостаточно воркеров (`concurrency` слишком мал)
|
||||||
|
- Проблемы с БД (медленные запросы, блокировки)
|
||||||
|
|
||||||
|
**Решение:**
|
||||||
|
- Профилировать код пайплайна
|
||||||
|
- Увеличить `concurrency` в `WORKERS_JSON`
|
||||||
|
- Оптимизировать запросы к БД (индексы, batching)
|
||||||
|
- Масштабировать горизонтально (больше реплик)
|
||||||
|
|
||||||
|
### Утечка памяти
|
||||||
|
|
||||||
|
**Симптомы:** Постепенный рост потребления памяти, OOM kills.
|
||||||
|
|
||||||
|
**Диагностика:**
|
||||||
|
```bash
|
||||||
|
# Мониторинг памяти
|
||||||
|
kubectl top pods -l app=dataloader --containers
|
||||||
|
|
||||||
|
# Проверить логи перед падением
|
||||||
|
kubectl logs dataloader-xxx --previous
|
||||||
|
```
|
||||||
|
|
||||||
|
**Возможные причины:**
|
||||||
|
- Накопление объектов в памяти в пайплайне
|
||||||
|
- Незакрытые соединения/файлы
|
||||||
|
- Утечки в зависимостях
|
||||||
|
|
||||||
|
**Решение:**
|
||||||
|
- Использовать context managers (`async with`) для ресурсов
|
||||||
|
- Обрабатывать данные чанками, не загружать всё в память
|
||||||
|
- Периодически перезапускать воркеры (restart policy)
|
||||||
|
|
||||||
|
### Проблемы с LISTEN/NOTIFY
|
||||||
|
|
||||||
|
**Симптомы:** Воркеры не просыпаются сразу после постановки задачи.
|
||||||
|
|
||||||
|
**Диагностика:**
|
||||||
|
```bash
|
||||||
|
# Проверить логи listener
|
||||||
|
docker logs dataloader | grep "notify_listener\|LISTEN"
|
||||||
|
|
||||||
|
# Проверить триггеры в БД
|
||||||
|
SELECT * FROM pg_trigger WHERE tgname LIKE 'dl_jobs%';
|
||||||
|
```
|
||||||
|
|
||||||
|
**Возможные причины:**
|
||||||
|
- Триггеры не созданы или отключены
|
||||||
|
- Проблемы с подключением asyncpg
|
||||||
|
- Воркер не подписан на канал
|
||||||
|
|
||||||
|
**Решение:**
|
||||||
|
- Воркер автоматически fallback'ится на polling при проблемах с LISTEN/NOTIFY
|
||||||
|
- Проверить DDL: триггеры `dl_jobs_notify_ins` и `dl_jobs_notify_upd`
|
||||||
|
- Проверить права пользователя БД на LISTEN/NOTIFY
|
||||||
|
|
|
||||||
|
|
@ -13,12 +13,4 @@ class JobNotFoundError(HTTPException):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class JobAlreadyCanceledError(HTTPException):
|
|
||||||
"""Задача уже отменена."""
|
|
||||||
|
|
||||||
def __init__(self, job_id: str):
|
|
||||||
super().__init__(
|
|
||||||
status_code=status.HTTP_400_BAD_REQUEST,
|
|
||||||
detail=f"Job {job_id} is already canceled or finished"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,9 @@ from http import HTTPStatus
|
||||||
from typing import Annotated
|
from typing import Annotated
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, HTTPException
|
from fastapi import APIRouter, Depends
|
||||||
|
|
||||||
|
from dataloader.api.v1.exceptions import JobNotFoundError
|
||||||
from dataloader.api.v1.schemas import (
|
from dataloader.api.v1.schemas import (
|
||||||
JobStatusResponse,
|
JobStatusResponse,
|
||||||
TriggerJobRequest,
|
TriggerJobRequest,
|
||||||
|
|
@ -49,7 +50,7 @@ async def get_status(
|
||||||
"""
|
"""
|
||||||
st = await svc.status(job_id)
|
st = await svc.status(job_id)
|
||||||
if not st:
|
if not st:
|
||||||
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="job not found")
|
raise JobNotFoundError(job_id=str(job_id))
|
||||||
return st
|
return st
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -63,5 +64,5 @@ async def cancel_job(
|
||||||
"""
|
"""
|
||||||
st = await svc.cancel(job_id)
|
st = await svc.cancel(job_id)
|
||||||
if not st:
|
if not st:
|
||||||
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="job not found")
|
raise JobNotFoundError(job_id=str(job_id))
|
||||||
return st
|
return st
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ from dataloader.storage.engine import create_engine, create_sessionmaker
|
||||||
if sys.platform == "win32":
|
if sys.platform == "win32":
|
||||||
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.asyncio
|
||||||
|
|
||||||
@pytest_asyncio.fixture(scope="function")
|
@pytest_asyncio.fixture(scope="function")
|
||||||
async def db_engine() -> AsyncGenerator[AsyncEngine, None]:
|
async def db_engine() -> AsyncGenerator[AsyncEngine, None]:
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,33 @@
|
||||||
|
# tests/unit/test_api_router_not_found.py
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from uuid import uuid4, UUID
|
||||||
|
|
||||||
|
from dataloader.api.v1.router import get_status, cancel_job
|
||||||
|
from dataloader.api.v1.exceptions import JobNotFoundError
|
||||||
|
from dataloader.api.v1.schemas import JobStatusResponse
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeSvc:
|
||||||
|
async def status(self, job_id: UUID) -> JobStatusResponse | None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def cancel(self, job_id: UUID) -> JobStatusResponse | None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_router_get_status_raises_job_not_found():
|
||||||
|
svc = _FakeSvc()
|
||||||
|
with pytest.raises(JobNotFoundError):
|
||||||
|
await get_status(uuid4(), svc=svc)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_router_cancel_raises_job_not_found():
|
||||||
|
svc = _FakeSvc()
|
||||||
|
with pytest.raises(JobNotFoundError):
|
||||||
|
await cancel_job(uuid4(), svc=svc)
|
||||||
|
|
@ -0,0 +1,57 @@
|
||||||
|
# tests/unit/test_api_router_success.py
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from uuid import uuid4, UUID
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
from dataloader.api.v1.router import get_status, cancel_job
|
||||||
|
from dataloader.api.v1.schemas import JobStatusResponse
|
||||||
|
|
||||||
|
|
||||||
|
class _SvcOK:
|
||||||
|
async def status(self, job_id: UUID) -> JobStatusResponse | None:
|
||||||
|
return JobStatusResponse(
|
||||||
|
job_id=job_id,
|
||||||
|
status="queued",
|
||||||
|
attempt=0,
|
||||||
|
started_at=None,
|
||||||
|
finished_at=None,
|
||||||
|
heartbeat_at=None,
|
||||||
|
error=None,
|
||||||
|
progress={},
|
||||||
|
)
|
||||||
|
|
||||||
|
async def cancel(self, job_id: UUID) -> JobStatusResponse | None:
|
||||||
|
return JobStatusResponse(
|
||||||
|
job_id=job_id,
|
||||||
|
status="canceled",
|
||||||
|
attempt=1,
|
||||||
|
started_at=datetime.now(timezone.utc),
|
||||||
|
finished_at=datetime.now(timezone.utc),
|
||||||
|
heartbeat_at=None,
|
||||||
|
error="by test",
|
||||||
|
progress={},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_router_get_status_returns_response():
|
||||||
|
svc = _SvcOK()
|
||||||
|
jid = uuid4()
|
||||||
|
res = await get_status(jid, svc=svc)
|
||||||
|
assert isinstance(res, JobStatusResponse)
|
||||||
|
assert res.job_id == jid
|
||||||
|
assert res.status == "queued"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_router_cancel_returns_response():
|
||||||
|
svc = _SvcOK()
|
||||||
|
jid = uuid4()
|
||||||
|
res = await cancel_job(jid, svc=svc)
|
||||||
|
assert isinstance(res, JobStatusResponse)
|
||||||
|
assert res.job_id == jid
|
||||||
|
assert res.status == "canceled"
|
||||||
|
|
@ -1,11 +1,14 @@
|
||||||
# tests/integration_tests/test_queue_repository.py
|
# tests/integration_tests/test_queue_repository.py
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone, timedelta
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from sqlalchemy import select
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from dataloader.storage.models import DLJob
|
||||||
from dataloader.storage.repositories import QueueRepository
|
from dataloader.storage.repositories import QueueRepository
|
||||||
from dataloader.storage.schemas import CreateJobRequest, JobStatus
|
from dataloader.storage.schemas import CreateJobRequest, JobStatus
|
||||||
|
|
||||||
|
|
@ -451,3 +454,232 @@ class TestQueueRepository:
|
||||||
status = await repo.get_status(job_id)
|
status = await repo.get_status(job_id)
|
||||||
assert status is not None
|
assert status is not None
|
||||||
assert status.status == "queued"
|
assert status.status == "queued"
|
||||||
|
|
||||||
|
async def test_claim_one_fails_on_advisory_lock_and_sets_backoff(
|
||||||
|
self,
|
||||||
|
db_session: AsyncSession,
|
||||||
|
clean_queue_tables,
|
||||||
|
job_id: str,
|
||||||
|
queue_name: str,
|
||||||
|
task_name: str,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Проверка ветки отказа advisory-lock: задача возвращается в queued с отложенным available_at.
|
||||||
|
"""
|
||||||
|
repo = QueueRepository(db_session)
|
||||||
|
|
||||||
|
req = CreateJobRequest(
|
||||||
|
job_id=job_id,
|
||||||
|
queue=queue_name,
|
||||||
|
task=task_name,
|
||||||
|
args={"k": "v"},
|
||||||
|
idempotency_key=None,
|
||||||
|
lock_key="lock-fail-adv",
|
||||||
|
partition_key="",
|
||||||
|
priority=10,
|
||||||
|
available_at=datetime.now(timezone.utc),
|
||||||
|
max_attempts=5,
|
||||||
|
lease_ttl_sec=30,
|
||||||
|
producer=None,
|
||||||
|
consumer_group=None,
|
||||||
|
)
|
||||||
|
await repo.create_or_get(req)
|
||||||
|
|
||||||
|
async def _false_lock(_: str) -> bool:
|
||||||
|
return False
|
||||||
|
|
||||||
|
repo._try_advisory_lock = _false_lock # type: ignore[method-assign]
|
||||||
|
|
||||||
|
before = datetime.now(timezone.utc)
|
||||||
|
claimed = await repo.claim_one(queue_name, claim_backoff_sec=15)
|
||||||
|
after = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
assert claimed is None
|
||||||
|
|
||||||
|
st = await repo.get_status(job_id)
|
||||||
|
assert st is not None
|
||||||
|
assert st.status == "queued"
|
||||||
|
|
||||||
|
row = (await db_session.execute(select(DLJob).where(DLJob.job_id == job_id))).scalar_one()
|
||||||
|
assert row.available_at >= before + timedelta(seconds=15)
|
||||||
|
assert row.available_at <= after + timedelta(seconds=60)
|
||||||
|
|
||||||
|
async def test_heartbeat_when_not_running_returns_false(
|
||||||
|
self,
|
||||||
|
db_session: AsyncSession,
|
||||||
|
clean_queue_tables,
|
||||||
|
job_id: str,
|
||||||
|
queue_name: str,
|
||||||
|
task_name: str,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Heartbeat для нерunning задачи возвращает (False, False).
|
||||||
|
"""
|
||||||
|
repo = QueueRepository(db_session)
|
||||||
|
|
||||||
|
req = CreateJobRequest(
|
||||||
|
job_id=job_id,
|
||||||
|
queue=queue_name,
|
||||||
|
task=task_name,
|
||||||
|
args={},
|
||||||
|
idempotency_key=None,
|
||||||
|
lock_key="lock-hb-not-running",
|
||||||
|
partition_key="",
|
||||||
|
priority=100,
|
||||||
|
available_at=datetime.now(timezone.utc),
|
||||||
|
max_attempts=5,
|
||||||
|
lease_ttl_sec=60,
|
||||||
|
producer=None,
|
||||||
|
consumer_group=None,
|
||||||
|
)
|
||||||
|
await repo.create_or_get(req)
|
||||||
|
|
||||||
|
ok, cancel = await repo.heartbeat(job_id, ttl_sec=30)
|
||||||
|
assert ok is False
|
||||||
|
assert cancel is False
|
||||||
|
|
||||||
|
async def test_finish_fail_or_retry_marks_canceled_branch(
|
||||||
|
self,
|
||||||
|
db_session: AsyncSession,
|
||||||
|
clean_queue_tables,
|
||||||
|
job_id: str,
|
||||||
|
queue_name: str,
|
||||||
|
task_name: str,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Ветка is_canceled=True помечает задачу как canceled и завершает её.
|
||||||
|
"""
|
||||||
|
repo = QueueRepository(db_session)
|
||||||
|
|
||||||
|
req = CreateJobRequest(
|
||||||
|
job_id=job_id,
|
||||||
|
queue=queue_name,
|
||||||
|
task=task_name,
|
||||||
|
args={},
|
||||||
|
idempotency_key=None,
|
||||||
|
lock_key="lock-cancel",
|
||||||
|
partition_key="",
|
||||||
|
priority=100,
|
||||||
|
available_at=datetime.now(timezone.utc),
|
||||||
|
max_attempts=5,
|
||||||
|
lease_ttl_sec=60,
|
||||||
|
producer=None,
|
||||||
|
consumer_group=None,
|
||||||
|
)
|
||||||
|
await repo.create_or_get(req)
|
||||||
|
await repo.claim_one(queue_name, claim_backoff_sec=5)
|
||||||
|
|
||||||
|
await repo.finish_fail_or_retry(job_id, err="Canceled by test", is_canceled=True)
|
||||||
|
|
||||||
|
st = await repo.get_status(job_id)
|
||||||
|
assert st is not None
|
||||||
|
assert st.status == "canceled"
|
||||||
|
assert st.error == "Canceled by test"
|
||||||
|
assert st.finished_at is not None
|
||||||
|
|
||||||
|
async def test_requeue_lost_no_expired_returns_empty(
|
||||||
|
self,
|
||||||
|
db_session: AsyncSession,
|
||||||
|
clean_queue_tables,
|
||||||
|
job_id: str,
|
||||||
|
queue_name: str,
|
||||||
|
task_name: str,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
requeue_lost без протухших задач возвращает пустой список.
|
||||||
|
"""
|
||||||
|
repo = QueueRepository(db_session)
|
||||||
|
|
||||||
|
req = CreateJobRequest(
|
||||||
|
job_id=job_id,
|
||||||
|
queue=queue_name,
|
||||||
|
task=task_name,
|
||||||
|
args={},
|
||||||
|
idempotency_key=None,
|
||||||
|
lock_key="lock-none-expired",
|
||||||
|
partition_key="",
|
||||||
|
priority=100,
|
||||||
|
available_at=datetime.now(timezone.utc),
|
||||||
|
max_attempts=5,
|
||||||
|
lease_ttl_sec=120,
|
||||||
|
producer=None,
|
||||||
|
consumer_group=None,
|
||||||
|
)
|
||||||
|
await repo.create_or_get(req)
|
||||||
|
await repo.claim_one(queue_name, claim_backoff_sec=5)
|
||||||
|
|
||||||
|
res = await repo.requeue_lost(now=datetime.now(timezone.utc))
|
||||||
|
assert res == []
|
||||||
|
|
||||||
|
st = await repo.get_status(job_id)
|
||||||
|
assert st is not None
|
||||||
|
assert st.status == "running"
|
||||||
|
|
||||||
|
async def test_private_helpers_resolve_queue_and_advisory_unlock_are_executable(
|
||||||
|
self,
|
||||||
|
db_session: AsyncSession,
|
||||||
|
clean_queue_tables,
|
||||||
|
job_id: str,
|
||||||
|
queue_name: str,
|
||||||
|
task_name: str,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Прямые прогоны приватных методов для покрытия редких веток.
|
||||||
|
"""
|
||||||
|
repo = QueueRepository(db_session)
|
||||||
|
|
||||||
|
rq = CreateJobRequest(
|
||||||
|
job_id=job_id,
|
||||||
|
queue=queue_name,
|
||||||
|
task=task_name,
|
||||||
|
args={},
|
||||||
|
idempotency_key=None,
|
||||||
|
lock_key="lock-direct-unlock",
|
||||||
|
partition_key="",
|
||||||
|
priority=1,
|
||||||
|
available_at=datetime.now(timezone.utc),
|
||||||
|
max_attempts=1,
|
||||||
|
lease_ttl_sec=5,
|
||||||
|
producer=None,
|
||||||
|
consumer_group=None,
|
||||||
|
)
|
||||||
|
await repo.create_or_get(rq)
|
||||||
|
|
||||||
|
missing_uuid = str(uuid4())
|
||||||
|
qname = await repo._resolve_queue(missing_uuid) # type: ignore[attr-defined]
|
||||||
|
assert qname == ""
|
||||||
|
|
||||||
|
await repo._advisory_unlock("lock-direct-unlock") # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
async def test_cancel_returns_false_for_nonexistent_job(
|
||||||
|
self,
|
||||||
|
db_session: AsyncSession,
|
||||||
|
clean_queue_tables,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Возвращает False при отмене несуществующей задачи.
|
||||||
|
"""
|
||||||
|
repo = QueueRepository(db_session)
|
||||||
|
assert await repo.cancel(str(uuid4())) is False
|
||||||
|
|
||||||
|
async def test_finish_ok_silent_when_job_absent(
|
||||||
|
self,
|
||||||
|
db_session: AsyncSession,
|
||||||
|
clean_queue_tables,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Тихо завершается, если задача не найдена.
|
||||||
|
"""
|
||||||
|
repo = QueueRepository(db_session)
|
||||||
|
await repo.finish_ok(str(uuid4()))
|
||||||
|
|
||||||
|
async def test_finish_fail_or_retry_noop_when_job_absent(
|
||||||
|
self,
|
||||||
|
db_session: AsyncSession,
|
||||||
|
clean_queue_tables,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Тихо выходит при отсутствии задачи.
|
||||||
|
"""
|
||||||
|
repo = QueueRepository(db_session)
|
||||||
|
await repo.finish_fail_or_retry(str(uuid4()), err="no-op")
|
||||||
|
|
|
||||||
|
|
@ -444,3 +444,69 @@ class TestPGWorker:
|
||||||
results.append(_)
|
results.append(_)
|
||||||
|
|
||||||
assert len(results) == 3
|
assert len(results) == 3
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_claim_and_execute_once_handles_shutdown_cancelled_error(self):
|
||||||
|
cfg = WorkerConfig(queue="test", heartbeat_sec=10, claim_backoff_sec=5)
|
||||||
|
stop_event = asyncio.Event()
|
||||||
|
|
||||||
|
with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \
|
||||||
|
patch("dataloader.workers.base.QueueRepository") as mock_repo_cls:
|
||||||
|
|
||||||
|
mock_session = AsyncMock()
|
||||||
|
mock_sm = MagicMock()
|
||||||
|
mock_sm.return_value.__aenter__.return_value = mock_session
|
||||||
|
mock_sm.return_value.__aexit__.return_value = AsyncMock(return_value=False)
|
||||||
|
mock_ctx.get_logger.return_value = Mock()
|
||||||
|
mock_ctx.sessionmaker = mock_sm
|
||||||
|
|
||||||
|
mock_repo = Mock()
|
||||||
|
mock_repo.claim_one = AsyncMock(return_value={
|
||||||
|
"job_id": "test-job-id",
|
||||||
|
"lease_ttl_sec": 60,
|
||||||
|
"task": "test.task",
|
||||||
|
"args": {}
|
||||||
|
})
|
||||||
|
mock_repo.finish_fail_or_retry = AsyncMock()
|
||||||
|
mock_repo_cls.return_value = mock_repo
|
||||||
|
|
||||||
|
worker = PGWorker(cfg, stop_event)
|
||||||
|
|
||||||
|
async def raise_cancel(*_args, **_kwargs):
|
||||||
|
raise asyncio.CancelledError()
|
||||||
|
|
||||||
|
with patch.object(worker, "_execute_with_heartbeat", new=raise_cancel):
|
||||||
|
await worker._claim_and_execute_once()
|
||||||
|
|
||||||
|
mock_repo.finish_fail_or_retry.assert_called_once()
|
||||||
|
args, kwargs = mock_repo.finish_fail_or_retry.call_args
|
||||||
|
assert args[0] == "test-job-id"
|
||||||
|
assert "cancelled by shutdown" in args[1]
|
||||||
|
assert kwargs.get("is_canceled") is True
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_execute_with_heartbeat_raises_cancelled_when_stop_set(self):
|
||||||
|
cfg = WorkerConfig(queue="test", heartbeat_sec=1000, claim_backoff_sec=5)
|
||||||
|
stop_event = asyncio.Event()
|
||||||
|
stop_event.set()
|
||||||
|
|
||||||
|
with patch("dataloader.workers.base.APP_CTX") as mock_ctx, \
|
||||||
|
patch("dataloader.workers.base.QueueRepository") as mock_repo_cls:
|
||||||
|
|
||||||
|
mock_ctx.get_logger.return_value = Mock()
|
||||||
|
mock_ctx.sessionmaker = Mock()
|
||||||
|
mock_repo_cls.return_value = Mock()
|
||||||
|
|
||||||
|
worker = PGWorker(cfg, stop_event)
|
||||||
|
|
||||||
|
async def one_yield():
|
||||||
|
yield
|
||||||
|
|
||||||
|
with pytest.raises(asyncio.CancelledError):
|
||||||
|
await worker._execute_with_heartbeat("job-id", 60, one_yield())
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,27 @@
|
||||||
|
# tests/unit/test_workers_reaper.py
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import AsyncMock, patch, Mock
|
||||||
|
|
||||||
|
from dataloader.workers.reaper import requeue_lost
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_requeue_lost_calls_repository_and_returns_ids():
|
||||||
|
"""
|
||||||
|
Проверяет, что requeue_lost вызывает QueueRepository.requeue_lost и возвращает результат.
|
||||||
|
"""
|
||||||
|
fake_session = Mock()
|
||||||
|
|
||||||
|
with patch("dataloader.workers.reaper.QueueRepository") as repo_cls:
|
||||||
|
repo = Mock()
|
||||||
|
repo.requeue_lost = AsyncMock(return_value=["id1", "id2"])
|
||||||
|
repo_cls.return_value = repo
|
||||||
|
|
||||||
|
res = await requeue_lost(fake_session)
|
||||||
|
|
||||||
|
assert res == ["id1", "id2"]
|
||||||
|
repo_cls.assert_called_once_with(fake_session)
|
||||||
|
repo.requeue_lost.assert_awaited_once()
|
||||||
Loading…
Reference in New Issue