про бд для OPU - нужно полученные строки грузить также по курсору стримингом в бд, но перед этим надо truncate таблицу делать, DDL вот - Это тестовый скрипт для взаимодействия с OPU Но тут надо сделать модель репозиторий, интерфейс и т.п, в скрипте показано только принцип получения данных. # test_export.py """ Скрипт для тестирования экспорта: - запуск задачи - мониторинг статуса - скачивание (полное и по частям) - распаковка - подсчёт строк - замер размеров """ import asyncio import httpx import zstandard as zstd from loguru import logger from pathlib import Path # ========= НАСТРОЙКИ ========= BASE_URL = "https://ci02533826-tib-brief.apps.ift-terra000024-edm.ocp.delta.sbrf.ru " ENDPOINT_START = "/export/opu/start" ENDPOINT_STATUS = "/export/{job_id}/status" ENDPOINT_DOWNLOAD = "/export/{job_id}/download" POLL_INTERVAL = 2 TIMEOUT = 3600 CHUNK_SIZE = 8192 OUTPUT_DIR = Path("./export_test") OUTPUT_DIR.mkdir(exist_ok=True) # ============================ def sizeof_fmt(num: int) -> str: """Форматирует размер файла в человекочитаемый вид.""" for unit in ['B', 'KB', 'MB', 'GB']: if num < 1024.0: return f"{num:.1f} {unit}" num /= 1024.0 return f"{num:.1f} TB" async def download_full(client: httpx.AsyncClient, url: str, filepath: Path) -> bool: """Скачивает файл полностью.""" logger.info(f"⬇️ Полная загрузка: {filepath.name}") try: response = await client.get(url, follow_redirects=True) if response.status_code != 200: logger.error(f"❌ Ошибка полной загрузки: {response.status_code} {response.text}") return False with open(filepath, "wb") as f: f.write(response.content) logger.success(f" Полная загрузка завершена: {sizeof_fmt(filepath.stat().st_size)}") return True except Exception as e: logger.error(f"❌ Ошибка при полной загрузке: {e}") return False async def download_range( client: httpx.AsyncClient, url: str, filepath: Path, start: int, end: int | None = None ) -> bool: """Скачивает диапазон байтов.""" headers = {"Range": f"bytes={start}-{end if end else ''}"} logger.info(f"⬇️ Загрузка диапазона {start}-{end if end else 'end'} -> {filepath.name}") try: response = await client.get(url, headers=headers, follow_redirects=True) if response.status_code != 206: logger.error(f"❌ Ожидался 206 Partial Content, получен: {response.status_code}") return False with open(filepath, "wb") as f: f.write(response.content) logger.success(f" Диапазон сохранён: {sizeof_fmt(filepath.stat().st_size)} байт") return True except Exception as e: logger.error(f"❌ Ошибка при загрузке диапазона: {e}") return False async def main(): logger.info("🚀 Запуск теста экспорта данных") cert_path = r"C:\Users\23193453\Documents\code\cert\client_cert.pem" key_path = r"C:\Users\23193453\Documents\code\cert\client_cert.key" ca_path = r"C:\Users\23193453\Documents\code\cert\client_ca.pem" async with httpx.AsyncClient( cert=(cert_path, key_path), verify=ca_path, timeout=TIMEOUT ) as client: try: # --- Шаг 1: Запуск задачи --- logger.info("📨 Отправка запроса на запуск экспорта OPU...") response = await client.post(BASE_URL + ENDPOINT_START) if response.status_code != 200: logger.error(f"❌ Ошибка запуска задачи: {response.status_code} {response.text}") return job_id = response.json()["job_id"] logger.info(f" Задача запущена: job_id={job_id}") # --- Шаг 2: Мониторинг статуса --- status_url = ENDPOINT_STATUS.format(job_id=job_id) logger.info("⏳ Ожидание завершения задачи...") start_wait = asyncio.get_event_loop().time() while True: response = await client.get(BASE_URL + status_url) if response.status_code != 200: logger.warning(f"⚠️ Ошибка при получении статуса: {response.status_code}") await asyncio.sleep(POLL_INTERVAL) continue status_data = response.json() status = status_data["status"] total_rows = status_data["total_rows"] elapsed = asyncio.get_event_loop().time() - start_wait logger.debug(f"📊 Статус: {status}, строк обработано: {total_rows}, прошло: {elapsed:.1f} с") if status == "completed": logger.info(f"🎉 Задача завершена! Обработано строк: {total_rows}") break elif status == "failed": logger.error(f"💥 Задача завершилась с ошибкой: {status_data['error']}") return elif status in ("pending", "running"): await asyncio.sleep(POLL_INTERVAL) continue download_url = BASE_URL + ENDPOINT_DOWNLOAD.format(job_id=job_id) # --- Тест 1: Полная загрузка --- full_path = OUTPUT_DIR / f"full_export_{job_id}.jsonl.zst" if not await download_full(client, download_url, full_path): return # --- Тест 2: Первые 1024 байта --- range_path = OUTPUT_DIR / f"range_head_{job_id}.bin" if not await download_range(client, download_url, range_path, start=0, end=1023): return # --- Тест 3: Возобновление с 1024 байта --- resume_path = OUTPUT_DIR / f"range_resume_{job_id}.bin" if not await download_range(client, download_url, resume_path, start=1024): return # --- Анализ полного архива --- archive_size = full_path.stat().st_size logger.success(f"📦 Полный архив: {sizeof_fmt(archive_size)}") # --- Распаковка --- unpacked_path = OUTPUT_DIR / f"export_{job_id}.jsonl" logger.info(f"📦 Распаковка архива в: {unpacked_path.name}") dctx = zstd.ZstdDecompressor() try: with open(full_path, "rb") as compressed: with open(unpacked_path, "wb") as dest: dctx.copy_stream(compressed, dest) unpacked_size = unpacked_path.stat().st_size logger.success(f" Распаковано: {sizeof_fmt(unpacked_size)}") # --- Подсчёт строк --- logger.info("🧮 Подсчёт строк в распакованном файле...") with open(unpacked_path, "rb") as f: line_count = sum(1 for _ in f) logger.success(f" Файл содержит {line_count:,} строк") # --- Итог --- logger.info("📈 ИТОГИ:") logger.info(f" Архив: {sizeof_fmt(archive_size)}") logger.info(f" Распаковано: {sizeof_fmt(unpacked_size)}") logger.info(f" Коэффициент сжатия: {archive_size / unpacked_size:.2f}x") logger.info(f" Строк: {line_count:,}") except Exception as e: logger.exception(f"❌ Ошибка при распаковке: {e}") logger.info("🏁 Все тесты завершены успешно!") except httpx.ConnectError as e: logger.critical(f"❌ Не удалось подключиться к серверу. Убедитесь, что сервис запущен. {e}") except Exception as e: logger.exception(f"❌ Неожиданная ошибка: {e}") if __name__ == "__main__": asyncio.run(main()) А от код сервиса, который отдает данные OPU - а в отдающем сервисе OPU такой код - service\src\gmap2\models\base.py - """ Модуль базовых моделей для ORM. Содержит базовый класс и миксин для динамического определения имён таблиц на основе конфигурации приложения. """ from sqlalchemy.orm import declarative_base, declared_attr from gmap2.config import APP_CONFIG TABLE_NAME_MAPPING = { "opudata": APP_CONFIG.gp.opu_table, "lprnewsdata": APP_CONFIG.gp.lpr_news_table, } BASE = declarative_base() class DynamicTableMixin: # pylint: disable=too-few-public-methods """ Миксин для динамического определения имени таблицы через __tablename__. Имя таблицы определяется по имени класса в нижнем регистре. Если имя класса присутствует в TABLE_NAME_MAPPING, используется значение из маппинга. В противном случае используется имя класса как имя таблицы. """ @declared_attr def __tablename__(cls) -> str: # pylint: disable=no-self-argument """ Динамически возвращает имя таблицы для модели. Использует маппинг из конфигурации, если имя класса известно. В противном случае возвращает имя класса в нижнем регистре. :return: Имя таблицы, используемое SQLAlchemy при работе с моделью. """ class_name = cls.__name__.lower() return TABLE_NAME_MAPPING.get(class_name, class_name) service\src\gmap2\models\opu.py - """ ORM-модель для таблицы OPU. Замечание: В БД отсутствует PRIMARY KEY. Для корректной работы SQLAlchemy используется виртуальный составной первичный ключ (wf_row_id, wf_load_id). Это не влияет на экспорт, но требуется для итерации через ORM. """ from sqlalchemy import Column, Date, DateTime, Float, Integer, String from .base import BASE, DynamicTableMixin class OPUData(DynamicTableMixin, BASE): # pylint: disable=too-few-public-methods """ Данные из таблицы brief_opu. """ __table_args__ = {"schema": "brief_opu_schema"} wf_row_id = Column(Integer, primary_key=True, nullable=False) wf_load_id = Column(Integer, primary_key=True, nullable=False) object_id = Column(String) object_nm = Column(String) desk_nm = Column(String) object_tp = Column(String) object_unit = Column(String) actdate = Column(Date) layer_cd = Column(String) layer_nm = Column(String) measure = Column(String) opu_cd = Column(String) opu_nm_sh = Column(String) opu_nm = Column(String) opu_lvl = Column(Integer) product_nm = Column(String) opu_prnt_cd = Column(String) opu_prnt_nm_sh = Column(String) opu_prnt_nm = Column(String) product_prnt_nm = Column(String) sum_amountrub_p_usd = Column(Float) wf_load_dttm = Column(DateTime) service\src\gmap2\repositories\base.py - """ Базовый репозиторий с общим функционалом стриминга. """ from collections.abc import AsyncGenerator, Callable from typing import Generic, List, Optional, TypeVar, Union from typing_extensions import Self from loguru import logger from sqlalchemy import Select, select from sqlalchemy.ext.asyncio import AsyncSession _T_Model = TypeVar("_T_Model") # pylint: disable=invalid-name class BaseRepository(Generic[_T_Model]): """ Абстрактный репозиторий с поддержкой стриминга. """ model: type[_T_Model] default_order_by: Union[List[Callable[[], List]], List] = [] def __init__(self, session: AsyncSession) -> None: """ Инициализирует репозиторий. Args: session: Асинхронная сессия SQLAlchemy. """ self.session = session async def stream_all( self, chunk_size: int = 10_000, statement: Optional[Select[tuple[_T_Model]]] = None, ) -> AsyncGenerator[list[_T_Model], None]: """ Потоково извлекает записи из БД пачками. Выполняет стриминг данных с использованием асинхронного курсора. Поддерживает кастомный запрос и автоматическое упорядочивание. :param chunk_size: Размер пачки записей для одной итерации. :param statement: Опциональный SQL-запрос. :yield: Список экземпляров модели, загруженных порциями. """ logger.info(f"Streaming {self.model.__name__} in batches of {chunk_size}") stmt = statement or select(self.model) if not statement and self.default_order_by: stmt = stmt.order_by(*self.default_order_by) try: result = await self.session.stream( stmt.execution_options( stream_results=True, max_row_count=chunk_size, ) ) partitions_method = result.partitions partitions_result = partitions_method(chunk_size) async for partition in partitions_result: items = [row[0] for row in partition] yield items logger.info(f"Finished streaming {self.model.__name__}") except Exception as e: logger.error( f"Error during streaming {self.model.__name__}: {type(e).__name__}: {e}" ) raise @classmethod def create(cls, session: AsyncSession) -> Self: """ Фабричный метод. Args: session: Асинхронная сессия. Returns: Экземпляр репозитория. """ return cls(session=session) service\src\gmap2\repositories\opu_repository.py - """ Репозиторий для OPU. """ from gmap2.models.opu import OPUData from .base import BaseRepository class OPURepository(BaseRepository[OPUData]): """ Репозиторий для OPUData. """ model = OPUData default_order_by = [ OPUData.wf_load_id, OPUData.wf_row_id, ] service\src\gmap2\api\v1\routes.py - """ Маршруты API для запуска, статуса и скачивания экспорта. """ from fastapi import APIRouter, BackgroundTasks, HTTPException, Request from gmap2.services.job.background_worker import ( run_lpr_news_export_job, run_opu_export_job, ) from gmap2.services.job.job_manager import JobManager from gmap2.utils.file_utils import create_range_aware_response from .schemas import ExportJobStatus, StartExportResponse router = APIRouter(prefix="/export", tags=["export"]) def get_job_manager(request: Request) -> JobManager: """ Извлекает JobManager из application state. """ return request.app.state.job_manager @router.post("/opu/start", response_model=StartExportResponse) async def start_opu_export( request: Request, background_tasks: BackgroundTasks ) -> StartExportResponse: """ Запускает фоновую задачу экспорта данных OPU. Returns: Идентификатор задачи. """ job_manager = get_job_manager(request) job_id = job_manager.start_job("opu") background_tasks.add_task(run_opu_export_job, job_id, job_manager) return StartExportResponse(job_id=job_id) @router.post("/lpr-news/start", response_model=StartExportResponse) async def start_lpr_news_export( request: Request, background_tasks: BackgroundTasks ) -> StartExportResponse: """ Запускает фоновую задачу экспорта данных LPR News. Returns: Идентификатор задачи. """ job_manager = get_job_manager(request) job_id = job_manager.start_job("lpr_news") background_tasks.add_task(run_lpr_news_export_job, job_id, job_manager) return StartExportResponse(job_id=job_id) @router.get("/{job_id}/status", response_model=ExportJobStatus) async def get_export_status(job_id: str, request: Request) -> ExportJobStatus: """ Возвращает текущий статус задачи экспорта. Args: job_id: Идентификатор задачи. Returns: Статус задачи. Raises: HTTPException 404: Если задача не найдена. """ job_manager = get_job_manager(request) status = job_manager.get_job_status(job_id) if not status: raise HTTPException(status_code=404, detail="Job not found") return ExportJobStatus(**status) @router.get("/{job_id}/download") async def download_export(job_id: str, request: Request): """ Стримит сжатый файл экспорта клиенту с поддержкой Range-запросов. """ job_manager = get_job_manager(request) status = job_manager.get_job_status(job_id) if not status: raise HTTPException(status_code=404, detail="Job not found") if status["status"] != "completed": raise HTTPException( status_code=409, detail=f"Job is {status['status']}, not completed" ) file_path = status.get("temp_file_path") if not file_path or not file_path.endswith(".jsonl.zst"): raise HTTPException(status_code=500, detail="Export file not available") filename = f"export_{job_id}.jsonl.zst" return await create_range_aware_response( file_path=file_path, filename=filename, request=request, media_type="application/octet-stream", ) service\src\gmap2\services\export\export_service.py - """ Сервис экспорта данных: объединяет репозиторий, форматирование и сжатие. Формат: JSON Lines + Zstandard (.jsonl.zst), один непрерывный zstd-фрейм. """ from __future__ import annotations import asyncio import math import os import tempfile import threading from collections.abc import AsyncGenerator, Callable from contextlib import asynccontextmanager from datetime import date, datetime from typing import Any, Tuple import aiofiles import zstandard as zstd from loguru import logger from orjson import OPT_NAIVE_UTC, OPT_UTC_Z # pylint: disable=no-name-in-module from orjson import dumps as orjson_dumps # pylint: disable=no-name-in-module from sqlalchemy.ext.asyncio import AsyncSession from gmap2.repositories import LPRNewsRepository, OPURepository from ..job.job_manager import JobManager from .formatters import models_to_dicts class _ZstdAsyncSink: """ Потокобезопасный приёмник для zstd.stream_writer. Собирает сжатые данные по чанкам в памяти с использованием блокировки. Предназначен для использования в асинхронном контексте, где сжатие выполняется в отдельном потоке. """ __slots__ = ("_chunks", "_lock") def __init__(self) -> None: """ Инициализирует приёмник с пустым списком чанков и блокировкой. """ self._chunks: list[bytes] = [] self._lock = threading.Lock() def write(self, b: bytes) -> int: """ Записывает байтовый фрагмент в буфер. Метод потокобезопасен. Копирует данные и добавляет в внутренний список чанков. :param b: Байтовые данные для записи. :return: Длина записанных данных. """ with self._lock: self._chunks.append(bytes(b)) return len(b) def drain(self) -> list[bytes]: """ Извлекает все накопленные чанки, сбрасывая внутренний буфер. Метод потокобезопасен. Возвращает список всех чанков, записанных с момента последнего сброса. :return: Список байтовых фрагментов. """ with self._lock: chunks = self._chunks self._chunks = [] return chunks class ExportService: """ Основной сервис экспорта данных в формате JSON Lines + zstd. """ def __init__(self, chunk_size: int = 10_000, zstd_level: int = 3) -> None: self.chunk_size = chunk_size self.zstd_level = zstd_level @asynccontextmanager async def _export_to_zstd( # pylint: disable=too-many-arguments,too-many-locals self, *, session: AsyncSession, job_id: str, job_manager: JobManager, repo_factory: Callable[[AsyncSession], Any], label: str, temp_dir: str | None = None, ) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]: repo = repo_factory(session) file_path: str | None = None try: with tempfile.NamedTemporaryFile( delete=False, suffix=".jsonl.zst", dir=temp_dir ) as tmp: file_path = tmp.name logger.info(f"[export] {label}: start -> {file_path}") cctx = zstd.ZstdCompressor(level=self.zstd_level) sink = _ZstdAsyncSink() async with aiofiles.open(file_path, "wb") as f: writer = cctx.stream_writer(sink) try: async for batch in repo.stream_all(chunk_size=self.chunk_size): if not batch: continue dicts = models_to_dicts(batch) payload = ("\n".join(_dumps(d) for d in dicts) + "\n").encode( "utf-8" ) await asyncio.to_thread(writer.write, payload) for chunk in sink.drain(): await f.write(chunk) job_manager.increment_rows(job_id, len(batch)) await asyncio.to_thread(writer.flush, zstd.FLUSH_FRAME) for chunk in sink.drain(): await f.write(chunk) finally: await asyncio.to_thread(writer.close) for chunk in sink.drain(): await f.write(chunk) await f.flush() logger.info(f"[export] {label}: done -> {file_path}") yield _stream_file(file_path), file_path except Exception: if file_path and os.path.exists(file_path): await asyncio.to_thread(os.remove, file_path) logger.exception(f"[export] {label}: failed, temporary file removed") raise @asynccontextmanager async def export_opu_to_zstd( self, session: AsyncSession, job_id: str, job_manager: JobManager, temp_dir: str | None = None, ) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]: """ Экспорт OPU в один непрерывный zstd-поток. """ async with self._export_to_zstd( session=session, job_id=job_id, job_manager=job_manager, repo_factory=OPURepository, label="OPU", temp_dir=temp_dir, ) as ctx: yield ctx @asynccontextmanager async def export_lpr_news_to_zstd( self, session: AsyncSession, job_id: str, job_manager: JobManager, temp_dir: str | None = None, ) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]: """ Экспорт LPR News в один непрерывный zstd-поток. """ async with self._export_to_zstd( session=session, job_id=job_id, job_manager=job_manager, repo_factory=LPRNewsRepository, label="LPR News", temp_dir=temp_dir, ) as ctx: yield ctx def _dumps(data: dict) -> str: """ Сериализует словарь в JSON-строку (orjson). """ return orjson_dumps( data, default=_serialize_value, option=OPT_UTC_Z | OPT_NAIVE_UTC, ).decode("utf-8") def _serialize_value(value: Any) -> Any: """ Преобразует значения к JSON-совместимому виду. """ if isinstance(value, (datetime, date)): return value.isoformat() if isinstance(value, float) and not math.isfinite(value): return None return value async def _stream_file( file_path: str, chunk_size: int = 8192 ) -> AsyncGenerator[bytes, None]: """ Асинхронно читает файл блоками. """ async with aiofiles.open(file_path, "rb") as f: while chunk := await f.read(chunk_size): yield chunk service\src\gmap2\services\export\compressors.py - """ Работа со сжатием данных с использованием zstandard. """ from typing import BinaryIO import zstandard as zstd def create_zstd_writer(fileobj: BinaryIO, level: int = 3) -> zstd.ZstdCompressionWriter: """ Создаёт сжатый writer поверх бинарного файла. Args: fileobj: Целевой файл (например, tempfile). level: Уровень сжатия (1–10). По умолчанию 3 - баланс скорости и размера. Returns: Объект для записи сжатых данных. """ cctx = zstd.ZstdCompressor(level=level) return cctx.stream_writer(fileobj) service\src\gmap2\services\export\formatters.py - """ Форматирование ORM-объектов в словари. С оптимизацией: кеширование структуры модели. """ from datetime import date, datetime from decimal import Decimal from typing import Any, Dict, List from weakref import WeakKeyDictionary from sqlalchemy import inspect from sqlalchemy.orm import RelationshipProperty _columns_cache: WeakKeyDictionary = WeakKeyDictionary() def _get_model_columns(model_instance: Any) -> List[str]: """ Возвращает список имен колонок модели (без отношений). Кеширует результат. """ model_class = model_instance.__class__ if model_class not in _columns_cache: mapper = inspect(model_class) columns = [ attr.key for attr in mapper.attrs if not isinstance(attr, RelationshipProperty) ] _columns_cache[model_class] = columns return _columns_cache[model_class] def _serialize_value(value: Any) -> Any: """ Сериализует значение в JSON-совместимый формат. """ if isinstance(value, (datetime, date)): return value.isoformat() if isinstance(value, Decimal): return float(value) if value is None: return None return value def models_to_dicts(instances: List[Any]) -> List[Dict[str, Any]]: """ Массовое преобразование списка ORM-объектов в словари. Использует кеширование структуры модели для высокой производительности. Args: instances: Список ORM-объектов. Returns: Список словарей. """ if not instances: return [] columns = _get_model_columns(instances[0]) return [ {col: _serialize_value(getattr(obj, col)) for col in columns} for obj in instances ] __all__ = [ "models_to_dicts", ] service\src\gmap2\services\job\background_worker.py - """ Фоновые задачи экспорта. """ from psycopg import errors as pg_errors from loguru import logger from sqlalchemy.exc import DatabaseError, OperationalError from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_exponential, ) from gmap2.context import APP_CTX from gmap2.services.export.export_service import ExportService from typing import Optional TRANSIENT_ERRORS = ( OperationalError, DatabaseError, ConnectionError, TimeoutError, pg_errors.ConnectionException, pg_errors.AdminShutdown, pg_errors.CannotConnectNow, ) def _create_export_job( export_method_name: str, job_type: str, ): """ Фабрика фоновых задач экспорта. Args: export_method_name: Название метода экспорта в ExportService. job_type: Тип задачи ("opu", "lpr_news"). Returns: Асинхронная функция-задача. """ @retry( stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, max=60), retry=retry_if_exception_type(TRANSIENT_ERRORS), reraise=True, ) async def run_export_job( job_id: str, job_manager, temp_dir: Optional[str] = None ) -> None: export_service = ExportService() try: async with APP_CTX.get_db_session() as session: job_manager.mark_running(job_id) export_method = getattr(export_service, export_method_name) async with export_method( session=session, job_id=job_id, job_manager=job_manager, temp_dir=temp_dir, ) as (stream, file_path): async for _ in stream: pass job_manager.mark_completed(job_id, file_path) except Exception as e: if not isinstance(e, TRANSIENT_ERRORS): job_manager.mark_failed(job_id, f"{type(e).__name__}: {e}") logger.exception(f"Export job {job_id} ({job_type}) failed") raise run_export_job.__name__ = f"run_{job_type}_export_job" run_export_job.__doc__ = f"Фоновая задача экспорта {job_type.upper()} с retry." return run_export_job run_opu_export_job = _create_export_job("export_opu_to_zstd", "opu") run_lpr_news_export_job = _create_export_job("export_lpr_news_to_zstd", "lpr_news")