dataloader/PIPELINE_OPU.md

33 KiB
Raw Permalink Blame History

про бд для OPU - нужно полученные строки грузить также по курсору стримингом в бд, но перед этим надо truncate таблицу делать, DDL вот - CREATE TABLE brief_digital_certificate_opu ( object_id text DEFAULT '-'::text NOT NULL, -- ИД клиента (REPORT_ID) object_nm text NULL, -- Наименование клиента desk_nm text DEFAULT '-'::text NOT NULL, -- Наименование деска actdate date DEFAULT CURRENT_DATE NOT NULL, -- Дата расчета layer_cd text DEFAULT '-'::text NOT NULL, -- Код слоя layer_nm text NULL, -- Наименование слоя opu_cd text NOT NULL, -- Код статьи опу opu_nm_sh text NULL, -- Краткое наименование кода опу opu_nm text NULL, -- Наименование кода опу opu_lvl int4 DEFAULT '-1'::integer NOT NULL, -- Уровень opu_prnt_cd text DEFAULT '-'::text NOT NULL, -- Код родителя opu_prnt_nm_sh text NULL, -- Краткое наименование родителя opu_prnt_nm text NULL, -- Наименование родителя sum_amountrub_p_usd numeric NULL, -- Сумма wf_load_id int8 DEFAULT '-1'::integer NOT NULL, -- Идентификатор потока загрузки wf_load_dttm timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL, -- Дата загрузки wf_row_id int8 DEFAULT '-1'::integer NOT NULL, -- Номер строки object_tp text NULL, object_unit text DEFAULT '-'::text NOT NULL, measure text NULL, product_nm text NULL, product_prnt_nm text NULL, sum_amountrub_p_usd_s numeric NULL, CONSTRAINT brief_digital_certificate_opu_pk PRIMARY KEY (object_id, desk_nm, actdate, layer_cd, opu_cd, opu_lvl, opu_prnt_cd, object_unit) ); COMMENT ON TABLE brief_digital_certificate_opu IS 'Показатели ОПУ';

Это тестовый скрипт для взаимодействия с OPU Но тут надо сделать модель репозиторий, интерфейс и т.п, в скрипте показано только принцип получения данных.

test_export.py

""" Скрипт для тестирования экспорта:

  • запуск задачи
  • мониторинг статуса
  • скачивание (полное и по частям)
  • распаковка
  • подсчёт строк
  • замер размеров """

import asyncio import httpx import zstandard as zstd from loguru import logger from pathlib import Path

========= НАСТРОЙКИ =========

BASE_URL = "https://ci02533826-tib-brief.apps.ift-terra000024-edm.ocp.delta.sbrf.ru " ENDPOINT_START = "/export/opu/start" ENDPOINT_STATUS = "/export/{job_id}/status" ENDPOINT_DOWNLOAD = "/export/{job_id}/download"

POLL_INTERVAL = 2 TIMEOUT = 3600 CHUNK_SIZE = 8192

OUTPUT_DIR = Path("./export_test") OUTPUT_DIR.mkdir(exist_ok=True)

============================

def sizeof_fmt(num: int) -> str: """Форматирует размер файла в человекочитаемый вид.""" for unit in ['B', 'KB', 'MB', 'GB']: if num < 1024.0: return f"{num:.1f} {unit}" num /= 1024.0 return f"{num:.1f} TB"

async def download_full(client: httpx.AsyncClient, url: str, filepath: Path) -> bool: """Скачивает файл полностью.""" logger.info(f"⬇️ Полная загрузка: {filepath.name}") try: response = await client.get(url, follow_redirects=True) if response.status_code != 200: logger.error(f" Ошибка полной загрузки: {response.status_code} {response.text}") return False with open(filepath, "wb") as f: f.write(response.content) logger.success(f" Полная загрузка завершена: {sizeof_fmt(filepath.stat().st_size)}") return True except Exception as e: logger.error(f" Ошибка при полной загрузке: {e}") return False

async def download_range( client: httpx.AsyncClient, url: str, filepath: Path, start: int, end: int | None = None ) -> bool: """Скачивает диапазон байтов.""" headers = {"Range": f"bytes={start}-{end if end else ''}"} logger.info(f"⬇️ Загрузка диапазона {start}-{end if end else 'end'} -> {filepath.name}") try: response = await client.get(url, headers=headers, follow_redirects=True) if response.status_code != 206: logger.error(f" Ожидался 206 Partial Content, получен: {response.status_code}") return False with open(filepath, "wb") as f: f.write(response.content) logger.success(f" Диапазон сохранён: {sizeof_fmt(filepath.stat().st_size)} байт") return True except Exception as e: logger.error(f" Ошибка при загрузке диапазона: {e}") return False

async def main(): logger.info("🚀 Запуск теста экспорта данных") cert_path = r"C:\Users\23193453\Documents\code\cert\client_cert.pem" key_path = r"C:\Users\23193453\Documents\code\cert\client_cert.key" ca_path = r"C:\Users\23193453\Documents\code\cert\client_ca.pem"

async with httpx.AsyncClient(
    cert=(cert_path, key_path),
    verify=ca_path,
    timeout=TIMEOUT
) as client:
    try:
        # --- Шаг 1: Запуск задачи ---
        logger.info("📨 Отправка запроса на запуск экспорта OPU...")
        response = await client.post(BASE_URL + ENDPOINT_START)
        if response.status_code != 200:
            logger.error(f"❌ Ошибка запуска задачи: {response.status_code} {response.text}")
            return
        job_id = response.json()["job_id"]
        logger.info(f" Задача запущена: job_id={job_id}")

        # --- Шаг 2: Мониторинг статуса ---
        status_url = ENDPOINT_STATUS.format(job_id=job_id)
        logger.info("⏳ Ожидание завершения задачи...")
        start_wait = asyncio.get_event_loop().time()

        while True:
            response = await client.get(BASE_URL + status_url)
            if response.status_code != 200:
                logger.warning(f"⚠️ Ошибка при получении статуса: {response.status_code}")
                await asyncio.sleep(POLL_INTERVAL)
                continue

            status_data = response.json()
            status = status_data["status"]
            total_rows = status_data["total_rows"]

            elapsed = asyncio.get_event_loop().time() - start_wait
            logger.debug(f"📊 Статус: {status}, строк обработано: {total_rows}, прошло: {elapsed:.1f} с")

            if status == "completed":
                logger.info(f"🎉 Задача завершена! Обработано строк: {total_rows}")
                break
            elif status == "failed":
                logger.error(f"💥 Задача завершилась с ошибкой: {status_data['error']}")
                return
            elif status in ("pending", "running"):
                await asyncio.sleep(POLL_INTERVAL)
                continue

        download_url = BASE_URL + ENDPOINT_DOWNLOAD.format(job_id=job_id)

        # --- Тест 1: Полная загрузка ---
        full_path = OUTPUT_DIR / f"full_export_{job_id}.jsonl.zst"
        if not await download_full(client, download_url, full_path):
            return

        # --- Тест 2: Первые 1024 байта ---
        range_path = OUTPUT_DIR / f"range_head_{job_id}.bin"
        if not await download_range(client, download_url, range_path, start=0, end=1023):
            return

        # --- Тест 3: Возобновление с 1024 байта ---
        resume_path = OUTPUT_DIR / f"range_resume_{job_id}.bin"
        if not await download_range(client, download_url, resume_path, start=1024):
            return

        # --- Анализ полного архива ---
        archive_size = full_path.stat().st_size
        logger.success(f"📦 Полный архив: {sizeof_fmt(archive_size)}")

        # --- Распаковка ---
        unpacked_path = OUTPUT_DIR / f"export_{job_id}.jsonl"
        logger.info(f"📦 Распаковка архива в: {unpacked_path.name}")
        dctx = zstd.ZstdDecompressor()
        try:
            with open(full_path, "rb") as compressed:
                with open(unpacked_path, "wb") as dest:
                    dctx.copy_stream(compressed, dest)
            unpacked_size = unpacked_path.stat().st_size
            logger.success(f" Распаковано: {sizeof_fmt(unpacked_size)}")

            # --- Подсчёт строк ---
            logger.info("🧮 Подсчёт строк в распакованном файле...")
            with open(unpacked_path, "rb") as f:
                line_count = sum(1 for _ in f)
            logger.success(f" Файл содержит {line_count:,} строк")

            # --- Итог ---
            logger.info("📈 ИТОГИ:")
            logger.info(f"  Архив:        {sizeof_fmt(archive_size)}")
            logger.info(f"  Распаковано:  {sizeof_fmt(unpacked_size)}")
            logger.info(f"  Коэффициент сжатия: {archive_size / unpacked_size:.2f}x")
            logger.info(f"  Строк:        {line_count:,}")
        except Exception as e:
            logger.exception(f"❌ Ошибка при распаковке: {e}")

        logger.info("🏁 Все тесты завершены успешно!")

    except httpx.ConnectError as e:
        logger.critical(f"❌ Не удалось подключиться к серверу. Убедитесь, что сервис запущен. {e}")
    except Exception as e:
        logger.exception(f"❌ Неожиданная ошибка: {e}")

if name == "main": asyncio.run(main())

А от код сервиса, который отдает данные OPU -

а в отдающем сервисе OPU такой код - service\src\gmap2\models\base.py - """ Модуль базовых моделей для ORM.

Содержит базовый класс и миксин для динамического определения имён таблиц на основе конфигурации приложения. """

from sqlalchemy.orm import declarative_base, declared_attr

from gmap2.config import APP_CONFIG

TABLE_NAME_MAPPING = { "opudata": APP_CONFIG.gp.opu_table, "lprnewsdata": APP_CONFIG.gp.lpr_news_table, }

BASE = declarative_base()

class DynamicTableMixin: # pylint: disable=too-few-public-methods """ Миксин для динамического определения имени таблицы через tablename.

Имя таблицы определяется по имени класса в нижнем регистре.
Если имя класса присутствует в TABLE_NAME_MAPPING, используется значение из маппинга.
В противном случае используется имя класса как имя таблицы.
"""

@declared_attr
def __tablename__(cls) -> str:  # pylint: disable=no-self-argument
    """
    Динамически возвращает имя таблицы для модели.

    Использует маппинг из конфигурации, если имя класса известно.
    В противном случае возвращает имя класса в нижнем регистре.

    :return: Имя таблицы, используемое SQLAlchemy при работе с моделью.
    """
    class_name = cls.__name__.lower()
    return TABLE_NAME_MAPPING.get(class_name, class_name)

service\src\gmap2\models\opu.py - """ ORM-модель для таблицы OPU.

Замечание: В БД отсутствует PRIMARY KEY. Для корректной работы SQLAlchemy используется виртуальный составной первичный ключ (wf_row_id, wf_load_id). Это не влияет на экспорт, но требуется для итерации через ORM. """

from sqlalchemy import Column, Date, DateTime, Float, Integer, String

from .base import BASE, DynamicTableMixin

class OPUData(DynamicTableMixin, BASE): # pylint: disable=too-few-public-methods """ Данные из таблицы brief_opu. """

__table_args__ = {"schema": "brief_opu_schema"}

wf_row_id = Column(Integer, primary_key=True, nullable=False)
wf_load_id = Column(Integer, primary_key=True, nullable=False)

object_id = Column(String)
object_nm = Column(String)
desk_nm = Column(String)
object_tp = Column(String)
object_unit = Column(String)
actdate = Column(Date)
layer_cd = Column(String)
layer_nm = Column(String)
measure = Column(String)
opu_cd = Column(String)
opu_nm_sh = Column(String)
opu_nm = Column(String)
opu_lvl = Column(Integer)
product_nm = Column(String)
opu_prnt_cd = Column(String)
opu_prnt_nm_sh = Column(String)
opu_prnt_nm = Column(String)
product_prnt_nm = Column(String)
sum_amountrub_p_usd = Column(Float)
wf_load_dttm = Column(DateTime)

service\src\gmap2\repositories\base.py - """ Базовый репозиторий с общим функционалом стриминга. """

from collections.abc import AsyncGenerator, Callable from typing import Generic, List, Optional, TypeVar, Union from typing_extensions import Self

from loguru import logger from sqlalchemy import Select, select from sqlalchemy.ext.asyncio import AsyncSession

_T_Model = TypeVar("_T_Model") # pylint: disable=invalid-name

class BaseRepository(Generic[_T_Model]): """ Абстрактный репозиторий с поддержкой стриминга. """

model: type[_T_Model]
default_order_by: Union[List[Callable[[], List]], List] = []

def __init__(self, session: AsyncSession) -> None:
    """
    Инициализирует репозиторий.

    Args:
        session: Асинхронная сессия SQLAlchemy.
    """
    self.session = session

async def stream_all(
    self,
    chunk_size: int = 10_000,
    statement: Optional[Select[tuple[_T_Model]]] = None,
) -> AsyncGenerator[list[_T_Model], None]:
    """
    Потоково извлекает записи из БД пачками.

    Выполняет стриминг данных с использованием асинхронного курсора.
    Поддерживает кастомный запрос и автоматическое упорядочивание.

    :param chunk_size: Размер пачки записей для одной итерации.
    :param statement: Опциональный SQL-запрос.
    :yield: Список экземпляров модели, загруженных порциями.
    """
    logger.info(f"Streaming {self.model.__name__} in batches of {chunk_size}")

    stmt = statement or select(self.model)

    if not statement and self.default_order_by:
        stmt = stmt.order_by(*self.default_order_by)

    try:
        result = await self.session.stream(
            stmt.execution_options(
                stream_results=True,
                max_row_count=chunk_size,
            )
        )

        partitions_method = result.partitions
        partitions_result = partitions_method(chunk_size)

        async for partition in partitions_result:
            items = [row[0] for row in partition]
            yield items

        logger.info(f"Finished streaming {self.model.__name__}")
    except Exception as e:
        logger.error(
            f"Error during streaming {self.model.__name__}: {type(e).__name__}: {e}"
        )
        raise

@classmethod
def create(cls, session: AsyncSession) -> Self:
    """
    Фабричный метод.

    Args:
        session: Асинхронная сессия.

    Returns:
        Экземпляр репозитория.
    """
    return cls(session=session)

service\src\gmap2\repositories\opu_repository.py - """ Репозиторий для OPU. """

from gmap2.models.opu import OPUData

from .base import BaseRepository

class OPURepository(BaseRepository[OPUData]): """ Репозиторий для OPUData. """

model = OPUData
default_order_by = [
    OPUData.wf_load_id,
    OPUData.wf_row_id,
]

service\src\gmap2\api\v1\routes.py - """ Маршруты API для запуска, статуса и скачивания экспорта. """

from fastapi import APIRouter, BackgroundTasks, HTTPException, Request

from gmap2.services.job.background_worker import ( run_lpr_news_export_job, run_opu_export_job, ) from gmap2.services.job.job_manager import JobManager from gmap2.utils.file_utils import create_range_aware_response

from .schemas import ExportJobStatus, StartExportResponse

router = APIRouter(prefix="/export", tags=["export"])

def get_job_manager(request: Request) -> JobManager: """ Извлекает JobManager из application state. """ return request.app.state.job_manager

@router.post("/opu/start", response_model=StartExportResponse) async def start_opu_export( request: Request, background_tasks: BackgroundTasks ) -> StartExportResponse: """ Запускает фоновую задачу экспорта данных OPU.

Returns:
    Идентификатор задачи.
"""
job_manager = get_job_manager(request)
job_id = job_manager.start_job("opu")

background_tasks.add_task(run_opu_export_job, job_id, job_manager)

return StartExportResponse(job_id=job_id)

@router.post("/lpr-news/start", response_model=StartExportResponse) async def start_lpr_news_export( request: Request, background_tasks: BackgroundTasks ) -> StartExportResponse: """ Запускает фоновую задачу экспорта данных LPR News.

Returns:
    Идентификатор задачи.
"""
job_manager = get_job_manager(request)
job_id = job_manager.start_job("lpr_news")

background_tasks.add_task(run_lpr_news_export_job, job_id, job_manager)

return StartExportResponse(job_id=job_id)

@router.get("/{job_id}/status", response_model=ExportJobStatus) async def get_export_status(job_id: str, request: Request) -> ExportJobStatus: """ Возвращает текущий статус задачи экспорта.

Args:
    job_id: Идентификатор задачи.

Returns:
    Статус задачи.

Raises:
    HTTPException 404: Если задача не найдена.
"""
job_manager = get_job_manager(request)
status = job_manager.get_job_status(job_id)
if not status:
    raise HTTPException(status_code=404, detail="Job not found")
return ExportJobStatus(**status)

@router.get("/{job_id}/download") async def download_export(job_id: str, request: Request): """ Стримит сжатый файл экспорта клиенту с поддержкой Range-запросов. """ job_manager = get_job_manager(request) status = job_manager.get_job_status(job_id)

if not status:
    raise HTTPException(status_code=404, detail="Job not found")

if status["status"] != "completed":
    raise HTTPException(
        status_code=409, detail=f"Job is {status['status']}, not completed"
    )

file_path = status.get("temp_file_path")
if not file_path or not file_path.endswith(".jsonl.zst"):
    raise HTTPException(status_code=500, detail="Export file not available")

filename = f"export_{job_id}.jsonl.zst"

return await create_range_aware_response(
    file_path=file_path,
    filename=filename,
    request=request,
    media_type="application/octet-stream",
)

service\src\gmap2\services\export\export_service.py - """ Сервис экспорта данных: объединяет репозиторий, форматирование и сжатие. Формат: JSON Lines + Zstandard (.jsonl.zst), один непрерывный zstd-фрейм. """

from future import annotations

import asyncio import math import os import tempfile import threading from collections.abc import AsyncGenerator, Callable from contextlib import asynccontextmanager from datetime import date, datetime from typing import Any, Tuple

import aiofiles import zstandard as zstd from loguru import logger from orjson import OPT_NAIVE_UTC, OPT_UTC_Z # pylint: disable=no-name-in-module from orjson import dumps as orjson_dumps # pylint: disable=no-name-in-module from sqlalchemy.ext.asyncio import AsyncSession

from gmap2.repositories import LPRNewsRepository, OPURepository

from ..job.job_manager import JobManager from .formatters import models_to_dicts

class _ZstdAsyncSink: """ Потокобезопасный приёмник для zstd.stream_writer.

Собирает сжатые данные по чанкам в памяти с использованием блокировки.
Предназначен для использования в асинхронном контексте,
где сжатие выполняется в отдельном потоке.
"""

__slots__ = ("_chunks", "_lock")

def __init__(self) -> None:
    """
    Инициализирует приёмник с пустым списком чанков и блокировкой.
    """
    self._chunks: list[bytes] = []
    self._lock = threading.Lock()

def write(self, b: bytes) -> int:
    """
    Записывает байтовый фрагмент в буфер.

    Метод потокобезопасен.
    Копирует данные и добавляет в внутренний список чанков.

    :param b: Байтовые данные для записи.
    :return: Длина записанных данных.
    """
    with self._lock:
        self._chunks.append(bytes(b))
    return len(b)

def drain(self) -> list[bytes]:
    """
    Извлекает все накопленные чанки, сбрасывая внутренний буфер.

    Метод потокобезопасен.
    Возвращает список всех чанков, записанных с момента последнего сброса.

    :return: Список байтовых фрагментов.
    """
    with self._lock:
        chunks = self._chunks
        self._chunks = []
    return chunks

class ExportService: """ Основной сервис экспорта данных в формате JSON Lines + zstd. """

def __init__(self, chunk_size: int = 10_000, zstd_level: int = 3) -> None:
    self.chunk_size = chunk_size
    self.zstd_level = zstd_level

@asynccontextmanager
async def _export_to_zstd(  # pylint: disable=too-many-arguments,too-many-locals
    self,
    *,
    session: AsyncSession,
    job_id: str,
    job_manager: JobManager,
    repo_factory: Callable[[AsyncSession], Any],
    label: str,
    temp_dir: str | None = None,
) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]:
    repo = repo_factory(session)
    file_path: str | None = None
    try:
        with tempfile.NamedTemporaryFile(
            delete=False, suffix=".jsonl.zst", dir=temp_dir
        ) as tmp:
            file_path = tmp.name

        logger.info(f"[export] {label}: start -> {file_path}")

        cctx = zstd.ZstdCompressor(level=self.zstd_level)
        sink = _ZstdAsyncSink()

        async with aiofiles.open(file_path, "wb") as f:
            writer = cctx.stream_writer(sink)
            try:
                async for batch in repo.stream_all(chunk_size=self.chunk_size):
                    if not batch:
                        continue
                    dicts = models_to_dicts(batch)
                    payload = ("\n".join(_dumps(d) for d in dicts) + "\n").encode(
                        "utf-8"
                    )

                    await asyncio.to_thread(writer.write, payload)

                    for chunk in sink.drain():
                        await f.write(chunk)

                    job_manager.increment_rows(job_id, len(batch))

                await asyncio.to_thread(writer.flush, zstd.FLUSH_FRAME)

                for chunk in sink.drain():
                    await f.write(chunk)
            finally:
                await asyncio.to_thread(writer.close)
                for chunk in sink.drain():
                    await f.write(chunk)
                await f.flush()

        logger.info(f"[export] {label}: done -> {file_path}")
        yield _stream_file(file_path), file_path

    except Exception:
        if file_path and os.path.exists(file_path):
            await asyncio.to_thread(os.remove, file_path)
        logger.exception(f"[export] {label}: failed, temporary file removed")
        raise

@asynccontextmanager
async def export_opu_to_zstd(
    self,
    session: AsyncSession,
    job_id: str,
    job_manager: JobManager,
    temp_dir: str | None = None,
) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]:
    """
    Экспорт OPU в один непрерывный zstd-поток.
    """
    async with self._export_to_zstd(
        session=session,
        job_id=job_id,
        job_manager=job_manager,
        repo_factory=OPURepository,
        label="OPU",
        temp_dir=temp_dir,
    ) as ctx:
        yield ctx

@asynccontextmanager
async def export_lpr_news_to_zstd(
    self,
    session: AsyncSession,
    job_id: str,
    job_manager: JobManager,
    temp_dir: str | None = None,
) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]:
    """
    Экспорт LPR News в один непрерывный zstd-поток.
    """
    async with self._export_to_zstd(
        session=session,
        job_id=job_id,
        job_manager=job_manager,
        repo_factory=LPRNewsRepository,
        label="LPR News",
        temp_dir=temp_dir,
    ) as ctx:
        yield ctx

def _dumps(data: dict) -> str: """ Сериализует словарь в JSON-строку (orjson). """

return orjson_dumps(
    data,
    default=_serialize_value,
    option=OPT_UTC_Z | OPT_NAIVE_UTC,
).decode("utf-8")

def _serialize_value(value: Any) -> Any: """ Преобразует значения к JSON-совместимому виду. """ if isinstance(value, (datetime, date)): return value.isoformat() if isinstance(value, float) and not math.isfinite(value): return None return value

async def _stream_file( file_path: str, chunk_size: int = 8192 ) -> AsyncGenerator[bytes, None]: """ Асинхронно читает файл блоками. """ async with aiofiles.open(file_path, "rb") as f: while chunk := await f.read(chunk_size): yield chunk

service\src\gmap2\services\export\compressors.py - """ Работа со сжатием данных с использованием zstandard. """

from typing import BinaryIO

import zstandard as zstd

def create_zstd_writer(fileobj: BinaryIO, level: int = 3) -> zstd.ZstdCompressionWriter: """ Создаёт сжатый writer поверх бинарного файла.

Args:
    fileobj: Целевой файл (например, tempfile).
    level: Уровень сжатия (110). По умолчанию 3 - баланс скорости и размера.

Returns:
    Объект для записи сжатых данных.
"""
cctx = zstd.ZstdCompressor(level=level)
return cctx.stream_writer(fileobj)

service\src\gmap2\services\export\formatters.py - """ Форматирование ORM-объектов в словари. С оптимизацией: кеширование структуры модели. """

from datetime import date, datetime from decimal import Decimal from typing import Any, Dict, List from weakref import WeakKeyDictionary

from sqlalchemy import inspect from sqlalchemy.orm import RelationshipProperty

_columns_cache: WeakKeyDictionary = WeakKeyDictionary()

def _get_model_columns(model_instance: Any) -> List[str]: """ Возвращает список имен колонок модели (без отношений). Кеширует результат. """ model_class = model_instance.class if model_class not in _columns_cache: mapper = inspect(model_class) columns = [ attr.key for attr in mapper.attrs if not isinstance(attr, RelationshipProperty) ] _columns_cache[model_class] = columns return _columns_cache[model_class]

def _serialize_value(value: Any) -> Any: """ Сериализует значение в JSON-совместимый формат. """ if isinstance(value, (datetime, date)): return value.isoformat() if isinstance(value, Decimal): return float(value) if value is None: return None return value

def models_to_dicts(instances: List[Any]) -> List[Dict[str, Any]]: """ Массовое преобразование списка ORM-объектов в словари.

Использует кеширование структуры модели для высокой производительности.

Args:
    instances: Список ORM-объектов.

Returns:
    Список словарей.
"""
if not instances:
    return []

columns = _get_model_columns(instances[0])
return [
    {col: _serialize_value(getattr(obj, col)) for col in columns}
    for obj in instances
]

all = [ "models_to_dicts", ]

service\src\gmap2\services\job\background_worker.py - """ Фоновые задачи экспорта. """

from psycopg import errors as pg_errors from loguru import logger from sqlalchemy.exc import DatabaseError, OperationalError from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_exponential, )

from gmap2.context import APP_CTX from gmap2.services.export.export_service import ExportService

from typing import Optional

TRANSIENT_ERRORS = ( OperationalError, DatabaseError, ConnectionError, TimeoutError, pg_errors.ConnectionException, pg_errors.AdminShutdown, pg_errors.CannotConnectNow, )

def _create_export_job( export_method_name: str, job_type: str, ): """ Фабрика фоновых задач экспорта.

Args:
    export_method_name: Название метода экспорта в ExportService.
    job_type: Тип задачи ("opu", "lpr_news").

Returns:
    Асинхронная функция-задача.
"""

@retry(
    stop=stop_after_attempt(10),
    wait=wait_exponential(multiplier=1, max=60),
    retry=retry_if_exception_type(TRANSIENT_ERRORS),
    reraise=True,
)
async def run_export_job(
    job_id: str, job_manager, temp_dir: Optional[str] = None
) -> None:
    export_service = ExportService()
    try:
        async with APP_CTX.get_db_session() as session:
            job_manager.mark_running(job_id)

            export_method = getattr(export_service, export_method_name)
            async with export_method(
                session=session,
                job_id=job_id,
                job_manager=job_manager,
                temp_dir=temp_dir,
            ) as (stream, file_path):
                async for _ in stream:
                    pass

            job_manager.mark_completed(job_id, file_path)

    except Exception as e:
        if not isinstance(e, TRANSIENT_ERRORS):
            job_manager.mark_failed(job_id, f"{type(e).__name__}: {e}")
        logger.exception(f"Export job {job_id} ({job_type}) failed")
        raise

run_export_job.__name__ = f"run_{job_type}_export_job"
run_export_job.__doc__ = f"Фоновая задача экспорта {job_type.upper()} с retry."

return run_export_job

run_opu_export_job = _create_export_job("export_opu_to_zstd", "opu") run_lpr_news_export_job = _create_export_job("export_lpr_news_to_zstd", "lpr_news")