From 0e888ec910f8e169db3de6dffb511e80068f3144 Mon Sep 17 00:00:00 2001 From: itqop Date: Wed, 5 Nov 2025 19:01:42 +0300 Subject: [PATCH] add pipeline --- PIPELINE_OPU.md | 926 ++++++++++++++++++++++++++++++++++ PIPELINE_TENERA.md | 1201 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 2127 insertions(+) create mode 100644 PIPELINE_OPU.md create mode 100644 PIPELINE_TENERA.md diff --git a/PIPELINE_OPU.md b/PIPELINE_OPU.md new file mode 100644 index 0000000..f313325 --- /dev/null +++ b/PIPELINE_OPU.md @@ -0,0 +1,926 @@ +про бд для OPU - нужно полученные строки грузить также по курсору стримингом в бд, но перед этим надо truncate таблицу делать, DDL вот - + + + + +Это тестовый скрипт для взаимодействия с OPU +Но тут надо сделать модель репозиторий, интерфейс и т.п, в скрипте показано только принцип получения данных. + +# test_export.py +""" +Скрипт для тестирования экспорта: +- запуск задачи +- мониторинг статуса +- скачивание (полное и по частям) +- распаковка +- подсчёт строк +- замер размеров +""" + +import asyncio +import httpx +import zstandard as zstd +from loguru import logger +from pathlib import Path + +# ========= НАСТРОЙКИ ========= +BASE_URL = "https://ci02533826-tib-brief.apps.ift-terra000024-edm.ocp.delta.sbrf.ru " +ENDPOINT_START = "/export/opu/start" +ENDPOINT_STATUS = "/export/{job_id}/status" +ENDPOINT_DOWNLOAD = "/export/{job_id}/download" + +POLL_INTERVAL = 2 +TIMEOUT = 3600 +CHUNK_SIZE = 8192 + +OUTPUT_DIR = Path("./export_test") +OUTPUT_DIR.mkdir(exist_ok=True) +# ============================ + + +def sizeof_fmt(num: int) -> str: + """Форматирует размер файла в человекочитаемый вид.""" + for unit in ['B', 'KB', 'MB', 'GB']: + if num < 1024.0: + return f"{num:.1f} {unit}" + num /= 1024.0 + return f"{num:.1f} TB" + + +async def download_full(client: httpx.AsyncClient, url: str, filepath: Path) -> bool: + """Скачивает файл полностью.""" + logger.info(f"⬇️ Полная загрузка: {filepath.name}") + try: + response = await client.get(url, follow_redirects=True) + if response.status_code != 200: + logger.error(f"❌ Ошибка полной загрузки: {response.status_code} {response.text}") + return False + with open(filepath, "wb") as f: + f.write(response.content) + logger.success(f" Полная загрузка завершена: {sizeof_fmt(filepath.stat().st_size)}") + return True + except Exception as e: + logger.error(f"❌ Ошибка при полной загрузке: {e}") + return False + + +async def download_range( + client: httpx.AsyncClient, + url: str, + filepath: Path, + start: int, + end: int | None = None +) -> bool: + """Скачивает диапазон байтов.""" + headers = {"Range": f"bytes={start}-{end if end else ''}"} + logger.info(f"⬇️ Загрузка диапазона {start}-{end if end else 'end'} -> {filepath.name}") + try: + response = await client.get(url, headers=headers, follow_redirects=True) + if response.status_code != 206: + logger.error(f"❌ Ожидался 206 Partial Content, получен: {response.status_code}") + return False + with open(filepath, "wb") as f: + f.write(response.content) + logger.success(f" Диапазон сохранён: {sizeof_fmt(filepath.stat().st_size)} байт") + return True + except Exception as e: + logger.error(f"❌ Ошибка при загрузке диапазона: {e}") + return False + + +async def main(): + logger.info("🚀 Запуск теста экспорта данных") + cert_path = r"C:\Users\23193453\Documents\code\cert\client_cert.pem" + key_path = r"C:\Users\23193453\Documents\code\cert\client_cert.key" + ca_path = r"C:\Users\23193453\Documents\code\cert\client_ca.pem" + + async with httpx.AsyncClient( + cert=(cert_path, key_path), + verify=ca_path, + timeout=TIMEOUT + ) as client: + try: + # --- Шаг 1: Запуск задачи --- + logger.info("📨 Отправка запроса на запуск экспорта OPU...") + response = await client.post(BASE_URL + ENDPOINT_START) + if response.status_code != 200: + logger.error(f"❌ Ошибка запуска задачи: {response.status_code} {response.text}") + return + job_id = response.json()["job_id"] + logger.info(f" Задача запущена: job_id={job_id}") + + # --- Шаг 2: Мониторинг статуса --- + status_url = ENDPOINT_STATUS.format(job_id=job_id) + logger.info("⏳ Ожидание завершения задачи...") + start_wait = asyncio.get_event_loop().time() + + while True: + response = await client.get(BASE_URL + status_url) + if response.status_code != 200: + logger.warning(f"⚠️ Ошибка при получении статуса: {response.status_code}") + await asyncio.sleep(POLL_INTERVAL) + continue + + status_data = response.json() + status = status_data["status"] + total_rows = status_data["total_rows"] + + elapsed = asyncio.get_event_loop().time() - start_wait + logger.debug(f"📊 Статус: {status}, строк обработано: {total_rows}, прошло: {elapsed:.1f} с") + + if status == "completed": + logger.info(f"🎉 Задача завершена! Обработано строк: {total_rows}") + break + elif status == "failed": + logger.error(f"💥 Задача завершилась с ошибкой: {status_data['error']}") + return + elif status in ("pending", "running"): + await asyncio.sleep(POLL_INTERVAL) + continue + + download_url = BASE_URL + ENDPOINT_DOWNLOAD.format(job_id=job_id) + + # --- Тест 1: Полная загрузка --- + full_path = OUTPUT_DIR / f"full_export_{job_id}.jsonl.zst" + if not await download_full(client, download_url, full_path): + return + + # --- Тест 2: Первые 1024 байта --- + range_path = OUTPUT_DIR / f"range_head_{job_id}.bin" + if not await download_range(client, download_url, range_path, start=0, end=1023): + return + + # --- Тест 3: Возобновление с 1024 байта --- + resume_path = OUTPUT_DIR / f"range_resume_{job_id}.bin" + if not await download_range(client, download_url, resume_path, start=1024): + return + + # --- Анализ полного архива --- + archive_size = full_path.stat().st_size + logger.success(f"📦 Полный архив: {sizeof_fmt(archive_size)}") + + # --- Распаковка --- + unpacked_path = OUTPUT_DIR / f"export_{job_id}.jsonl" + logger.info(f"📦 Распаковка архива в: {unpacked_path.name}") + dctx = zstd.ZstdDecompressor() + try: + with open(full_path, "rb") as compressed: + with open(unpacked_path, "wb") as dest: + dctx.copy_stream(compressed, dest) + unpacked_size = unpacked_path.stat().st_size + logger.success(f" Распаковано: {sizeof_fmt(unpacked_size)}") + + # --- Подсчёт строк --- + logger.info("🧮 Подсчёт строк в распакованном файле...") + with open(unpacked_path, "rb") as f: + line_count = sum(1 for _ in f) + logger.success(f" Файл содержит {line_count:,} строк") + + # --- Итог --- + logger.info("📈 ИТОГИ:") + logger.info(f" Архив: {sizeof_fmt(archive_size)}") + logger.info(f" Распаковано: {sizeof_fmt(unpacked_size)}") + logger.info(f" Коэффициент сжатия: {archive_size / unpacked_size:.2f}x") + logger.info(f" Строк: {line_count:,}") + except Exception as e: + logger.exception(f"❌ Ошибка при распаковке: {e}") + + logger.info("🏁 Все тесты завершены успешно!") + + except httpx.ConnectError as e: + logger.critical(f"❌ Не удалось подключиться к серверу. Убедитесь, что сервис запущен. {e}") + except Exception as e: + logger.exception(f"❌ Неожиданная ошибка: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) + + + + + + + +А от код сервиса, который отдает данные OPU - + +а в отдающем сервисе OPU такой код - +service\src\gmap2\models\base.py - +""" +Модуль базовых моделей для ORM. + +Содержит базовый класс и миксин для динамического определения имён таблиц +на основе конфигурации приложения. +""" + +from sqlalchemy.orm import declarative_base, declared_attr + +from gmap2.config import APP_CONFIG + +TABLE_NAME_MAPPING = { + "opudata": APP_CONFIG.gp.opu_table, + "lprnewsdata": APP_CONFIG.gp.lpr_news_table, +} + + +BASE = declarative_base() + + +class DynamicTableMixin: # pylint: disable=too-few-public-methods + """ + Миксин для динамического определения имени таблицы через __tablename__. + + Имя таблицы определяется по имени класса в нижнем регистре. + Если имя класса присутствует в TABLE_NAME_MAPPING, используется значение из маппинга. + В противном случае используется имя класса как имя таблицы. + """ + + @declared_attr + def __tablename__(cls) -> str: # pylint: disable=no-self-argument + """ + Динамически возвращает имя таблицы для модели. + + Использует маппинг из конфигурации, если имя класса известно. + В противном случае возвращает имя класса в нижнем регистре. + + :return: Имя таблицы, используемое SQLAlchemy при работе с моделью. + """ + class_name = cls.__name__.lower() + return TABLE_NAME_MAPPING.get(class_name, class_name) + +service\src\gmap2\models\opu.py - +""" +ORM-модель для таблицы OPU. + +Замечание: + В БД отсутствует PRIMARY KEY. Для корректной работы SQLAlchemy + используется виртуальный составной первичный ключ (wf_row_id, wf_load_id). + Это не влияет на экспорт, но требуется для итерации через ORM. +""" + +from sqlalchemy import Column, Date, DateTime, Float, Integer, String + +from .base import BASE, DynamicTableMixin + + +class OPUData(DynamicTableMixin, BASE): # pylint: disable=too-few-public-methods + """ + Данные из таблицы brief_opu. + """ + + __table_args__ = {"schema": "brief_opu_schema"} + + wf_row_id = Column(Integer, primary_key=True, nullable=False) + wf_load_id = Column(Integer, primary_key=True, nullable=False) + + object_id = Column(String) + object_nm = Column(String) + desk_nm = Column(String) + object_tp = Column(String) + object_unit = Column(String) + actdate = Column(Date) + layer_cd = Column(String) + layer_nm = Column(String) + measure = Column(String) + opu_cd = Column(String) + opu_nm_sh = Column(String) + opu_nm = Column(String) + opu_lvl = Column(Integer) + product_nm = Column(String) + opu_prnt_cd = Column(String) + opu_prnt_nm_sh = Column(String) + opu_prnt_nm = Column(String) + product_prnt_nm = Column(String) + sum_amountrub_p_usd = Column(Float) + wf_load_dttm = Column(DateTime) + +service\src\gmap2\repositories\base.py - +""" +Базовый репозиторий с общим функционалом стриминга. +""" + +from collections.abc import AsyncGenerator, Callable +from typing import Generic, List, Optional, TypeVar, Union +from typing_extensions import Self + +from loguru import logger +from sqlalchemy import Select, select +from sqlalchemy.ext.asyncio import AsyncSession + +_T_Model = TypeVar("_T_Model") # pylint: disable=invalid-name + + +class BaseRepository(Generic[_T_Model]): + """ + Абстрактный репозиторий с поддержкой стриминга. + """ + + model: type[_T_Model] + default_order_by: Union[List[Callable[[], List]], List] = [] + + def __init__(self, session: AsyncSession) -> None: + """ + Инициализирует репозиторий. + + Args: + session: Асинхронная сессия SQLAlchemy. + """ + self.session = session + + async def stream_all( + self, + chunk_size: int = 10_000, + statement: Optional[Select[tuple[_T_Model]]] = None, + ) -> AsyncGenerator[list[_T_Model], None]: + """ + Потоково извлекает записи из БД пачками. + + Выполняет стриминг данных с использованием асинхронного курсора. + Поддерживает кастомный запрос и автоматическое упорядочивание. + + :param chunk_size: Размер пачки записей для одной итерации. + :param statement: Опциональный SQL-запрос. + :yield: Список экземпляров модели, загруженных порциями. + """ + logger.info(f"Streaming {self.model.__name__} in batches of {chunk_size}") + + stmt = statement or select(self.model) + + if not statement and self.default_order_by: + stmt = stmt.order_by(*self.default_order_by) + + try: + result = await self.session.stream( + stmt.execution_options( + stream_results=True, + max_row_count=chunk_size, + ) + ) + + partitions_method = result.partitions + partitions_result = partitions_method(chunk_size) + + async for partition in partitions_result: + items = [row[0] for row in partition] + yield items + + logger.info(f"Finished streaming {self.model.__name__}") + except Exception as e: + logger.error( + f"Error during streaming {self.model.__name__}: {type(e).__name__}: {e}" + ) + raise + + @classmethod + def create(cls, session: AsyncSession) -> Self: + """ + Фабричный метод. + + Args: + session: Асинхронная сессия. + + Returns: + Экземпляр репозитория. + """ + return cls(session=session) + +service\src\gmap2\repositories\opu_repository.py - +""" +Репозиторий для OPU. +""" + +from gmap2.models.opu import OPUData + +from .base import BaseRepository + + +class OPURepository(BaseRepository[OPUData]): + """ + Репозиторий для OPUData. + """ + + model = OPUData + default_order_by = [ + OPUData.wf_load_id, + OPUData.wf_row_id, + ] + +service\src\gmap2\api\v1\routes.py - +""" +Маршруты API для запуска, статуса и скачивания экспорта. +""" + +from fastapi import APIRouter, BackgroundTasks, HTTPException, Request + +from gmap2.services.job.background_worker import ( + run_lpr_news_export_job, + run_opu_export_job, +) +from gmap2.services.job.job_manager import JobManager +from gmap2.utils.file_utils import create_range_aware_response + +from .schemas import ExportJobStatus, StartExportResponse + +router = APIRouter(prefix="/export", tags=["export"]) + + +def get_job_manager(request: Request) -> JobManager: + """ + Извлекает JobManager из application state. + """ + return request.app.state.job_manager + + +@router.post("/opu/start", response_model=StartExportResponse) +async def start_opu_export( + request: Request, background_tasks: BackgroundTasks +) -> StartExportResponse: + """ + Запускает фоновую задачу экспорта данных OPU. + + Returns: + Идентификатор задачи. + """ + job_manager = get_job_manager(request) + job_id = job_manager.start_job("opu") + + background_tasks.add_task(run_opu_export_job, job_id, job_manager) + + return StartExportResponse(job_id=job_id) + + +@router.post("/lpr-news/start", response_model=StartExportResponse) +async def start_lpr_news_export( + request: Request, background_tasks: BackgroundTasks +) -> StartExportResponse: + """ + Запускает фоновую задачу экспорта данных LPR News. + + Returns: + Идентификатор задачи. + """ + job_manager = get_job_manager(request) + job_id = job_manager.start_job("lpr_news") + + background_tasks.add_task(run_lpr_news_export_job, job_id, job_manager) + + return StartExportResponse(job_id=job_id) + + +@router.get("/{job_id}/status", response_model=ExportJobStatus) +async def get_export_status(job_id: str, request: Request) -> ExportJobStatus: + """ + Возвращает текущий статус задачи экспорта. + + Args: + job_id: Идентификатор задачи. + + Returns: + Статус задачи. + + Raises: + HTTPException 404: Если задача не найдена. + """ + job_manager = get_job_manager(request) + status = job_manager.get_job_status(job_id) + if not status: + raise HTTPException(status_code=404, detail="Job not found") + return ExportJobStatus(**status) + + +@router.get("/{job_id}/download") +async def download_export(job_id: str, request: Request): + """ + Стримит сжатый файл экспорта клиенту с поддержкой Range-запросов. + """ + job_manager = get_job_manager(request) + status = job_manager.get_job_status(job_id) + + if not status: + raise HTTPException(status_code=404, detail="Job not found") + + if status["status"] != "completed": + raise HTTPException( + status_code=409, detail=f"Job is {status['status']}, not completed" + ) + + file_path = status.get("temp_file_path") + if not file_path or not file_path.endswith(".jsonl.zst"): + raise HTTPException(status_code=500, detail="Export file not available") + + filename = f"export_{job_id}.jsonl.zst" + + return await create_range_aware_response( + file_path=file_path, + filename=filename, + request=request, + media_type="application/octet-stream", + ) + +service\src\gmap2\services\export\export_service.py - +""" +Сервис экспорта данных: объединяет репозиторий, форматирование и сжатие. +Формат: JSON Lines + Zstandard (.jsonl.zst), один непрерывный zstd-фрейм. +""" + +from __future__ import annotations + +import asyncio +import math +import os +import tempfile +import threading +from collections.abc import AsyncGenerator, Callable +from contextlib import asynccontextmanager +from datetime import date, datetime +from typing import Any, Tuple + +import aiofiles +import zstandard as zstd +from loguru import logger +from orjson import OPT_NAIVE_UTC, OPT_UTC_Z # pylint: disable=no-name-in-module +from orjson import dumps as orjson_dumps # pylint: disable=no-name-in-module +from sqlalchemy.ext.asyncio import AsyncSession + +from gmap2.repositories import LPRNewsRepository, OPURepository + +from ..job.job_manager import JobManager +from .formatters import models_to_dicts + + +class _ZstdAsyncSink: + """ + Потокобезопасный приёмник для zstd.stream_writer. + + Собирает сжатые данные по чанкам в памяти с использованием блокировки. + Предназначен для использования в асинхронном контексте, + где сжатие выполняется в отдельном потоке. + """ + + __slots__ = ("_chunks", "_lock") + + def __init__(self) -> None: + """ + Инициализирует приёмник с пустым списком чанков и блокировкой. + """ + self._chunks: list[bytes] = [] + self._lock = threading.Lock() + + def write(self, b: bytes) -> int: + """ + Записывает байтовый фрагмент в буфер. + + Метод потокобезопасен. + Копирует данные и добавляет в внутренний список чанков. + + :param b: Байтовые данные для записи. + :return: Длина записанных данных. + """ + with self._lock: + self._chunks.append(bytes(b)) + return len(b) + + def drain(self) -> list[bytes]: + """ + Извлекает все накопленные чанки, сбрасывая внутренний буфер. + + Метод потокобезопасен. + Возвращает список всех чанков, записанных с момента последнего сброса. + + :return: Список байтовых фрагментов. + """ + with self._lock: + chunks = self._chunks + self._chunks = [] + return chunks + + +class ExportService: + """ + Основной сервис экспорта данных в формате JSON Lines + zstd. + """ + + def __init__(self, chunk_size: int = 10_000, zstd_level: int = 3) -> None: + self.chunk_size = chunk_size + self.zstd_level = zstd_level + + @asynccontextmanager + async def _export_to_zstd( # pylint: disable=too-many-arguments,too-many-locals + self, + *, + session: AsyncSession, + job_id: str, + job_manager: JobManager, + repo_factory: Callable[[AsyncSession], Any], + label: str, + temp_dir: str | None = None, + ) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]: + repo = repo_factory(session) + file_path: str | None = None + try: + with tempfile.NamedTemporaryFile( + delete=False, suffix=".jsonl.zst", dir=temp_dir + ) as tmp: + file_path = tmp.name + + logger.info(f"[export] {label}: start -> {file_path}") + + cctx = zstd.ZstdCompressor(level=self.zstd_level) + sink = _ZstdAsyncSink() + + async with aiofiles.open(file_path, "wb") as f: + writer = cctx.stream_writer(sink) + try: + async for batch in repo.stream_all(chunk_size=self.chunk_size): + if not batch: + continue + dicts = models_to_dicts(batch) + payload = ("\n".join(_dumps(d) for d in dicts) + "\n").encode( + "utf-8" + ) + + await asyncio.to_thread(writer.write, payload) + + for chunk in sink.drain(): + await f.write(chunk) + + job_manager.increment_rows(job_id, len(batch)) + + await asyncio.to_thread(writer.flush, zstd.FLUSH_FRAME) + + for chunk in sink.drain(): + await f.write(chunk) + finally: + await asyncio.to_thread(writer.close) + for chunk in sink.drain(): + await f.write(chunk) + await f.flush() + + logger.info(f"[export] {label}: done -> {file_path}") + yield _stream_file(file_path), file_path + + except Exception: + if file_path and os.path.exists(file_path): + await asyncio.to_thread(os.remove, file_path) + logger.exception(f"[export] {label}: failed, temporary file removed") + raise + + @asynccontextmanager + async def export_opu_to_zstd( + self, + session: AsyncSession, + job_id: str, + job_manager: JobManager, + temp_dir: str | None = None, + ) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]: + """ + Экспорт OPU в один непрерывный zstd-поток. + """ + async with self._export_to_zstd( + session=session, + job_id=job_id, + job_manager=job_manager, + repo_factory=OPURepository, + label="OPU", + temp_dir=temp_dir, + ) as ctx: + yield ctx + + @asynccontextmanager + async def export_lpr_news_to_zstd( + self, + session: AsyncSession, + job_id: str, + job_manager: JobManager, + temp_dir: str | None = None, + ) -> AsyncGenerator[Tuple[AsyncGenerator[bytes, None], str], None]: + """ + Экспорт LPR News в один непрерывный zstd-поток. + """ + async with self._export_to_zstd( + session=session, + job_id=job_id, + job_manager=job_manager, + repo_factory=LPRNewsRepository, + label="LPR News", + temp_dir=temp_dir, + ) as ctx: + yield ctx + + +def _dumps(data: dict) -> str: + """ + Сериализует словарь в JSON-строку (orjson). + """ + + return orjson_dumps( + data, + default=_serialize_value, + option=OPT_UTC_Z | OPT_NAIVE_UTC, + ).decode("utf-8") + + +def _serialize_value(value: Any) -> Any: + """ + Преобразует значения к JSON-совместимому виду. + """ + if isinstance(value, (datetime, date)): + return value.isoformat() + if isinstance(value, float) and not math.isfinite(value): + return None + return value + + +async def _stream_file( + file_path: str, chunk_size: int = 8192 +) -> AsyncGenerator[bytes, None]: + """ + Асинхронно читает файл блоками. + """ + async with aiofiles.open(file_path, "rb") as f: + while chunk := await f.read(chunk_size): + yield chunk + +service\src\gmap2\services\export\compressors.py - +""" +Работа со сжатием данных с использованием zstandard. +""" + +from typing import BinaryIO + +import zstandard as zstd + + +def create_zstd_writer(fileobj: BinaryIO, level: int = 3) -> zstd.ZstdCompressionWriter: + """ + Создаёт сжатый writer поверх бинарного файла. + + Args: + fileobj: Целевой файл (например, tempfile). + level: Уровень сжатия (1–10). По умолчанию 3 - баланс скорости и размера. + + Returns: + Объект для записи сжатых данных. + """ + cctx = zstd.ZstdCompressor(level=level) + return cctx.stream_writer(fileobj) + +service\src\gmap2\services\export\formatters.py - +""" +Форматирование ORM-объектов в словари. +С оптимизацией: кеширование структуры модели. +""" + +from datetime import date, datetime +from decimal import Decimal +from typing import Any, Dict, List +from weakref import WeakKeyDictionary + +from sqlalchemy import inspect +from sqlalchemy.orm import RelationshipProperty + +_columns_cache: WeakKeyDictionary = WeakKeyDictionary() + + +def _get_model_columns(model_instance: Any) -> List[str]: + """ + Возвращает список имен колонок модели (без отношений). + Кеширует результат. + """ + model_class = model_instance.__class__ + if model_class not in _columns_cache: + mapper = inspect(model_class) + columns = [ + attr.key + for attr in mapper.attrs + if not isinstance(attr, RelationshipProperty) + ] + _columns_cache[model_class] = columns + return _columns_cache[model_class] + + +def _serialize_value(value: Any) -> Any: + """ + Сериализует значение в JSON-совместимый формат. + """ + if isinstance(value, (datetime, date)): + return value.isoformat() + if isinstance(value, Decimal): + return float(value) + if value is None: + return None + return value + + +def models_to_dicts(instances: List[Any]) -> List[Dict[str, Any]]: + """ + Массовое преобразование списка ORM-объектов в словари. + + Использует кеширование структуры модели для высокой производительности. + + Args: + instances: Список ORM-объектов. + + Returns: + Список словарей. + """ + if not instances: + return [] + + columns = _get_model_columns(instances[0]) + return [ + {col: _serialize_value(getattr(obj, col)) for col in columns} + for obj in instances + ] + + +__all__ = [ + "models_to_dicts", +] + +service\src\gmap2\services\job\background_worker.py - +""" +Фоновые задачи экспорта. +""" + +from psycopg import errors as pg_errors +from loguru import logger +from sqlalchemy.exc import DatabaseError, OperationalError +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) + +from gmap2.context import APP_CTX +from gmap2.services.export.export_service import ExportService + +from typing import Optional + +TRANSIENT_ERRORS = ( + OperationalError, + DatabaseError, + ConnectionError, + TimeoutError, + pg_errors.ConnectionException, + pg_errors.AdminShutdown, + pg_errors.CannotConnectNow, +) + + +def _create_export_job( + export_method_name: str, + job_type: str, +): + """ + Фабрика фоновых задач экспорта. + + Args: + export_method_name: Название метода экспорта в ExportService. + job_type: Тип задачи ("opu", "lpr_news"). + + Returns: + Асинхронная функция-задача. + """ + + @retry( + stop=stop_after_attempt(10), + wait=wait_exponential(multiplier=1, max=60), + retry=retry_if_exception_type(TRANSIENT_ERRORS), + reraise=True, + ) + async def run_export_job( + job_id: str, job_manager, temp_dir: Optional[str] = None + ) -> None: + export_service = ExportService() + try: + async with APP_CTX.get_db_session() as session: + job_manager.mark_running(job_id) + + export_method = getattr(export_service, export_method_name) + async with export_method( + session=session, + job_id=job_id, + job_manager=job_manager, + temp_dir=temp_dir, + ) as (stream, file_path): + async for _ in stream: + pass + + job_manager.mark_completed(job_id, file_path) + + except Exception as e: + if not isinstance(e, TRANSIENT_ERRORS): + job_manager.mark_failed(job_id, f"{type(e).__name__}: {e}") + logger.exception(f"Export job {job_id} ({job_type}) failed") + raise + + run_export_job.__name__ = f"run_{job_type}_export_job" + run_export_job.__doc__ = f"Фоновая задача экспорта {job_type.upper()} с retry." + + return run_export_job + + +run_opu_export_job = _create_export_job("export_opu_to_zstd", "opu") +run_lpr_news_export_job = _create_export_job("export_lpr_news_to_zstd", "lpr_news") diff --git a/PIPELINE_TENERA.md b/PIPELINE_TENERA.md new file mode 100644 index 0000000..3f168f4 --- /dev/null +++ b/PIPELINE_TENERA.md @@ -0,0 +1,1201 @@ +Это вырезка файлов из ETL сервиса старого. Оно работало, но местами было не совсем оптимально. +нужно сделать из этого файла полноценный ETL задачу для нашего сервиса, раскидать куда надо интерфейс, модели и т.п и зарегистрировать задачу. + +src\dataloader\interfaces\tenera\interface.py - +"""Интерфейс для взаимодействия с сервисом SuperTenera. + +Позволяет запросить актуальные данные по котировкам с SuperTenera в json. +""" + +import json +from pathlib import Path +import ssl +import uuid +from asyncio import TimeoutError +from datetime import datetime +from typing import Any, Literal, Self + +import aiohttp + +from dataloader.config import APP_CONFIG +from dataloader.interfaces.tenera.schemas import MainData +from dataloader.interfaces.utils import log_method_call + + +class SuperTeneraConnectionError(Exception): + """Ошибка подключения к SuperTenera API.""" + + +class SuperTeneraInterface: + """Интерфейс для взаимодействия с сервисом SuperTenera по http.""" + + def __init__( + self, + logger, + base_url: str, + *, + timezone=None, + ) -> None: + """ + Constructor. + + Args: + logger: Logger instance. + base_url: base service url. + timezone: timezone for datetime objects. + """ + self.logger = logger + self.base_url = base_url + self.timezone = timezone + + self._session: aiohttp.ClientSession | None = None + + self._ssl_context = None + if APP_CONFIG.app.local: + self._ssl_context = ssl.create_default_context(cafile=APP_CONFIG.certs.ca_bundle_file) + self._ssl_context.load_cert_chain(certfile=APP_CONFIG.certs.cert_file, keyfile=APP_CONFIG.certs.key_file) + + def form_base_headers(self) -> dict: + """Form metadata for call.""" + metadata_pairs = { + "request-id": str(uuid.uuid4()), + "request-time": str(datetime.now(tz=self.timezone).isoformat()), + "system-id": APP_CONFIG.app.system_id, + } + return {metakey: metavalue for metakey, metavalue in metadata_pairs.items() if metavalue} + + async def __aenter__(self) -> Self: + """Async context manager enter.""" + self._session = aiohttp.ClientSession( + base_url=self.base_url, + connector=aiohttp.TCPConnector(limit=100), + headers=self.form_base_headers(), + ) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + """Async context manager exit.""" + if exc_val is not None: + self.logger.error(f"{exc_type}: {exc_val}") + await self._session.close() + + async def _get_request( + self, + url: str, + encoding: str | None = None, + content_type: str | None = "application/json", + **kwargs, + ) -> Any: + """Get realization.""" + + kwargs["ssl"] = self._ssl_context + + async with self._session.get(url, **kwargs) as response: + if APP_CONFIG.app.debug: + self.logger.debug(f"Response: {(await response.text(errors='ignore'))[:100]}") + return await response.json(encoding=encoding, content_type=content_type) + + @log_method_call + async def get_quotes_data(self) -> MainData: + """Получить данные котировок от SuperTenera""" + data = await self._get_request(APP_CONFIG.supertenera.quotes_endpoint) + + # mock_path = Path(__file__).parent / "supertenera_response.json" + # with open(mock_path, "r", encoding="utf-8") as f: + # data = json.load(f) + + return MainData.model_validate(data) + + @log_method_call + async def ping(self, **kwargs) -> Literal[True]: + """ + Быстрая проверка доступности SuperTenera API. + + True - если ответ < 400, иначе SuperTeneraConnectionError. + """ + kwargs["ssl"] = self._ssl_context + try: + async with self._session.get( + APP_CONFIG.supertenera.quotes_endpoint, timeout=APP_CONFIG.supertenera.timeout, **kwargs + ) as resp: + resp.raise_for_status() + return True + except aiohttp.ClientResponseError as e: + raise SuperTeneraConnectionError( + f"Ошибка подключения к SuperTenera API при проверке системы - {e.status}." + ) from e + except TimeoutError as e: + raise SuperTeneraConnectionError( + f"Ошибка Timeout подключения к SuperTenera API при проверке системы." + ) from e + + +def get_async_tenera_interface() -> SuperTeneraInterface: + """Get SuperTenera instance.""" + from dataloader.context import APP_CTX + + return SuperTeneraInterface( + logger=APP_CTX.get_logger(), + base_url=APP_CTX.get_tenera_base_url(), + timezone=APP_CTX.get_pytz_timezone(), + ) + +src\dataloader\interfaces\tenera\schemas.py - +"""Схемы ответов""" + +from __future__ import annotations + +import re +from typing import Literal + +from pydantic import BaseModel, ConfigDict, Field, RootModel, field_validator + + +class TeneraBaseModel(BaseModel): + """Базовая модель для всех схем SuperTenera с настройкой populate_by_name.""" + + model_config = ConfigDict( + populate_by_name=True, + extra="ignore", + ) + + +# --- TimePoint models --- + + +class EmptyTimePoint(TeneraBaseModel): + """ + Модель заглушка для полностью пустых значений в точке времени. + Позволяет корректно валидировать случаи, когда JSON поле {} без содержимого. + """ + + pass # pylint: disable=unnecessary-pass + + +class CbrTimePoint(TeneraBaseModel): + """ + Структура данных точки времени для источника Центрального банка России (ЦБР). + + Поля: + - value: Строка с числовым значением ("80,32") + """ + + value: str + + +class InvestingNumeric(TeneraBaseModel): + """ + Структура данных точки времени для источника Investing.com в формате по странам. + + Поля (alias на русском): + - profit: Доходность + - base_value: Базовое + - max_value: Максимальное + - min_value: Минимальное + - change: Изменение + - change_ptc: Процент изменений + """ + + profit: str = Field(alias="Доходность") + base_value: str = Field(alias="Осн.") + max_value: str = Field(alias="Макс.") + min_value: str = Field(alias="Мин.") + change: str = Field(alias="Изм.") + change_ptc: str = Field(alias="Изм. %") + + +class InvestingCandlestick(TeneraBaseModel): + """ + Структура данных точки времени для источника Investing.com в формате свечи. + + Поля (alias латинскими заглавными буквами): + - open_: "O" + - high: "H" + - low: "L" + - close: "C" + - interest: "I" | None + - value: "V" + """ + + open_: str = Field(alias="O") + high: str = Field(alias="H") + low: str = Field(alias="L") + close: str = Field(alias="C") + interest: str | None = Field(alias="I") + value: str = Field(alias="V") + + +class InvestingTimePoint(RootModel[EmptyTimePoint | InvestingNumeric | InvestingCandlestick]): + """ + Union-модель точки времени для источника Investing.com. + + 1) {} -> EmptyTImePoint + 2) numeric -> InvestingNumeric + 3) свечной -> InvestingCandlestick + """ + + pass # pylint: disable=unnecessary-pass + + +class SgxTimePoint(TeneraBaseModel): + """ + Структура данных точки времени для источника Сингапурской биржи (SGX). + + Поля (alias латинскими заглавными буквами): + - open_: "O" + - high: "H" + - low: "L" + - close: "C" + - interest: "I" + - value: "V" + """ + + open_: str | None = Field(alias="O") + high: str | None = Field(alias="H") + low: str | None = Field(alias="L") + close: str | None = Field(alias="C") + interest: str | None = Field(alias="I") + value: str | None = Field(alias="V") + + +class TradingEconomicsEmptyString(RootModel[str]): + """ + Валидирует точно пустую строку (""). + + Используется для точек данных TradingEconomics, содержащих пустые строковые значения. + + Поля: + - root: Строка, которая должна быть точно пустой ("") + """ + + root: str + + @field_validator("root", mode="before") + @classmethod + def _must_be_empty(cls, v) -> Literal[""]: + if v == "": + return v + raise ValueError("not an empty string") + + +class TradingEconomicsStringPercent(RootModel[str]): + """ + Валидирует строки-проценты вида "3.1%" или "-0,14%". + + Принимает как запятую, так и точку в качестве десятичного разделителя. + Шаблон: опциональный минус, цифры, опциональная десятичная часть, знак процента. + + Поля: + root: Строка с процентным значением (например: "3.1%", "-0,14%", "15%") + """ + + root: str + + @field_validator("root") + @classmethod + def _check_percent(cls, v) -> str: + if isinstance(v, str) and re.match(r"^-?\d+(?:[.,]\d+)?%$", v): + return v + raise ValueError(f"invalid percent string: {v!r}") + + +class TradingEconomicsStringTime(RootModel[str]): + """ + Валидирует строки времени в формате "h:mm AM/PM". + + Примеры: "01:15 AM", "12:30 PM", "9:45 AM" + + Поля: + root: Строка времени в 12-часовом формате с AM/PM + """ + + root: str + + @field_validator("root") + @classmethod + def _check_time(cls, v) -> str: + if isinstance(v, str) and re.match(r"^(0?[1-9]|1[0-2]):[0-5]\d\s[AP]M$", v): + return v + raise ValueError(f"invalid time string: {v!r}") + + +class TradingEconomicsNumeric(TeneraBaseModel): + """ + Полный числовой формат данных от TradingEconomics. + + Содержит полную рыночную информацию с ценой, дневным изменением, процентами + и различными периодическими изменениями (недельными, месячными, с начала года, год к году). + + Поля: + price: Текущая цена инструмента (алиас: "Price") + day: Дневное изменение в абсолютных значениях (алиас: "Day") + percent: Дневное изменение в процентах (алиас: "%") + weekly: Недельное изменение (алиас: "Weekly") + monthly: Месячное изменение (алиас: "Monthly") + ytd: Изменение с начала года (алиас: "YTD") + yoy: Изменение год к году (алиас: "YoY") + """ + + price: str = Field(alias="Price") + day: str = Field(alias="Day") + percent: str = Field(alias="%") + weekly: str = Field(alias="Weekly") + monthly: str = Field(alias="Monthly") + ytd: str = Field(alias="YTD") + yoy: str = Field(alias="YoY") + + +class TradingEconomicsLastPrev(TeneraBaseModel): + """ + Формат Last/Previous/Unit от TradingEconomics. + + Содержит текущее значение, предыдущее значение и единицу измерения. + Обычно используется для экономических индикаторов и статистики. + + Поля: + last: Последнее (текущее) значение показателя (алиас: "Last") + previous: Предыдущее значение показателя (алиас: "Previous") + unit: Единица измерения показателя (алиас: "Unit") + """ + + last: str = Field(alias="Last") + previous: str = Field(alias="Previous") + unit: str = Field(alias="Unit") + + +class TradingEconomicsTimePoint( + RootModel[ + EmptyTimePoint + | TradingEconomicsEmptyString + | TradingEconomicsStringPercent + | TradingEconomicsStringTime + | TradingEconomicsNumeric + | TradingEconomicsLastPrev + ] +): + """ + Объединение всех возможных форматов точек времени TradingEconomics. + + Поддерживает: + - Пустые объекты ({}) + - Пустые строки ("") + - Строки-проценты ("3.1%", "-0,14%") + - Строки времени ("01:15 AM") + - Полные числовые объекты с полями цена/день/% + - Объекты Last/Previous/Unit для экономических индикаторов + + Поля: + root: Один из поддерживаемых типов точек времени TradingEconomics + """ + + pass # pylint: disable=unnecessary-pass + + +class BloombergTimePoint(TeneraBaseModel): + """ + Структура данных точки времени для источника Bloomberg. + + Поля: + - value: Строка с числовым значением ("80,32") + """ + + value: str + + +class TradingViewTimePoint(TeneraBaseModel): + """ + Структура данных точки времени для источника TradingView. + + Поля (alias латинскими заглавными буквами): + - open_: "O" + - high: "H" + - low: "L" + - close: "C" + - volume: "Vol" + """ + + open_: str | None = Field(alias="O") + high: str | None = Field(alias="H") + low: str | None = Field(alias="L") + close: str | None = Field(alias="C") + volume: str | None = Field(alias="Vol") + + +class TimePointUnion( + RootModel[ + EmptyTimePoint + | CbrTimePoint + | InvestingTimePoint + | SgxTimePoint + | TradingEconomicsTimePoint + | BloombergTimePoint + | TradingViewTimePoint + ] +): + """ + Универсальное объединение для точек времени от всех поддерживаемых источников данных. + + Обрабатывает структуры данных от: + - ЦБР (Центральный банк России) + - Investing.com + - SGX (Сингапурская биржа) + - TradingEconomics + - Bloomberg + - TradingView + + Поля: + root: Точка времени от любого из поддерживаемых источников данных + """ + + pass # pylint: disable=unnecessary-pass + + +InstrumentData = dict[str | int, TimePointUnion] +"""Тип: Отображение timestamp -> TimePointUnion.""" + +SourceData = dict[str, InstrumentData] +"""Тип: Отображение имени инструмента -> InstrumentData.""" + + +class MainData(TeneraBaseModel): + """ + Основной контейнер данных для всех источников финансовых данных от SuperTenera. + + Содержит опциональные данные от нескольких поставщиков финансовых данных, + структурированные по источникам, а затем по инструментам. + + Поля: + cbr: Данные от Центрального банка России (опционально) + investing: Данные от Investing.com (опционально) + sgx: Данные от Сингапурской биржи (опционально) + tradingeconomics: Данные от TradingEconomics (опционально) + bloomberg: Данные от Bloomberg (опционально) + trading_view: Данные от TradingView (опционально, алиас: "trading_view") + """ + + cbr: SourceData | None = None + investing: SourceData | None = None + sgx: SourceData | None = None + tradingeconomics: SourceData | None = None + bloomberg: SourceData | None = None + trading_view: SourceData | None = Field(default=None, alias="trading_view") + + @field_validator("investing", mode="before") + @classmethod + def _filter_investing(cls, v) -> SourceData | None: + """ + Фильтрация данных от Investing.com. + + Убираем: + - все ключи, у которых значение null + - все ключи, которые выглядят как чистые числа (timestamps) + + :param v: Объект с данными от Investing.com + :return: Отфильтрованный объект + """ + + if isinstance(v, dict): + return {key: value for key, value in v.items() if value is not None and not str(key).isdigit()} + return v + +src\dataloader\interfaces\tenera\__init__.py - +from . import schemas +from .interface import SuperTeneraInterface, get_async_tenera_interface + +__all__ = [ + "schemas", + "SuperTeneraInterface", + "get_async_tenera_interface", +] + +src\dataloader\models\quote.py - +"""Quote модель.""" + +from __future__ import annotations + +from datetime import datetime +from typing import TYPE_CHECKING + +from sqlalchemy import JSON, TIMESTAMP, BigInteger, ForeignKey, String, UniqueConstraint, func +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from dataloader.base import Base + +if TYPE_CHECKING: + from .quote_section import QuoteSection + from .quote_value import QuoteValue + + +class Quote(Base): + """Представляет custom_cib_quotes.quotes.""" + + __tablename__ = "quotes" + __table_args__ = (UniqueConstraint("quote_sect_id", "name", name="ak_uq_quote_name_and_quotes"),) + + quote_id: Mapped[int] = mapped_column(BigInteger(), primary_key=True) + name: Mapped[str] = mapped_column(String, nullable=False) + params: Mapped[dict | None] = mapped_column(JSON) + srce: Mapped[str | None] = mapped_column(String) + ticker: Mapped[str | None] = mapped_column(String) + quote_sect_id: Mapped[int] = mapped_column( + ForeignKey("quotes_sect.quote_sect_id", ondelete="CASCADE", onupdate="CASCADE"), + nullable=False, + ) + last_update_dttm: Mapped[datetime | None] = mapped_column(TIMESTAMP(timezone=True)) + + load_dttm: Mapped[datetime] = mapped_column( + TIMESTAMP(timezone=False), + nullable=False, + server_default=func.current_timestamp(), + ) + + section: Mapped[QuoteSection] = relationship(back_populates="quotes") + values: Mapped[list[QuoteValue]] = relationship( + back_populates="quote", + cascade="all, delete-orphan", + ) + + def __repr__(self) -> str: + return f"" + +src\dataloader\models\quote_value.py - +"""Quote-value модель.""" + +from __future__ import annotations + +from datetime import datetime +from typing import TYPE_CHECKING + +from sqlalchemy import TIMESTAMP, BigInteger, DateTime, Float, ForeignKey, String, UniqueConstraint, func +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from dataloader.base import Base + +if TYPE_CHECKING: + from .quote import Quote + + +class QuoteValue(Base): + """Представляет custom_cib_quotes.quotes_values.""" + + __tablename__ = "quotes_values" + __table_args__ = (UniqueConstraint("quote_id", "dt", name="ak_uq_quote_and_date_quotes"),) + + quotes_values_id: Mapped[int] = mapped_column( + BigInteger(), + primary_key=True, + autoincrement=True, + ) + quote_id: Mapped[int] = mapped_column( + ForeignKey("quotes.quote_id", ondelete="RESTRICT", onupdate="RESTRICT"), + nullable=False, + ) + dt: Mapped[datetime] = mapped_column(DateTime, nullable=False) + + price_o: Mapped[float | None] = mapped_column(Float) + price_c: Mapped[float | None] = mapped_column(Float) + price_h: Mapped[float | None] = mapped_column(Float) + price_l: Mapped[float | None] = mapped_column(Float) + volume: Mapped[float | None] = mapped_column(Float) + + load_dttm: Mapped[datetime] = mapped_column( + TIMESTAMP(timezone=False), + nullable=False, + server_default=func.current_timestamp(), + ) + + unit: Mapped[str | None] = mapped_column(String) + key: Mapped[int | None] = mapped_column(BigInteger()) + + value_profit: Mapped[float | None] = mapped_column(Float) + value_base: Mapped[float | None] = mapped_column(Float) + value_max: Mapped[float | None] = mapped_column(Float) + value_min: Mapped[float | None] = mapped_column(Float) + value_chng: Mapped[float | None] = mapped_column(Float) + value_chng_prc: Mapped[float | None] = mapped_column(Float) + + price_i: Mapped[float | None] = mapped_column(Float) + price: Mapped[float | None] = mapped_column(Float) + value_day: Mapped[float | None] = mapped_column(Float) + value_prc: Mapped[float | None] = mapped_column(Float) + value_weekly_prc: Mapped[float | None] = mapped_column(Float) + value_monthly_prc: Mapped[float | None] = mapped_column(Float) + value_ytd_prc: Mapped[float | None] = mapped_column(Float) + value_yoy_prc: Mapped[float | None] = mapped_column(Float) + value_last: Mapped[float | None] = mapped_column(Float) + value_previous: Mapped[float | None] = mapped_column(Float) + + is_empty_str_flg: Mapped[bool | None] = mapped_column() + interest: Mapped[float | None] = mapped_column(Float) + + quote: Mapped[Quote] = relationship(back_populates="values") + + def __repr__(self) -> str: + return f"" + +src\dataloader\models\quote_section.py - +"""Quote-section модель.""" + +from __future__ import annotations + +from datetime import datetime +from typing import TYPE_CHECKING + +from sqlalchemy import JSON, TIMESTAMP, Integer, Sequence, String, func +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from dataloader.base import Base + +if TYPE_CHECKING: + from .quote import Quote + + +class QuoteSection(Base): + """Представляет custom_cib_quotes.quotes_sect.""" + + __tablename__ = "quotes_sect" + + quote_sect_id: Mapped[int] = mapped_column(Integer(), Sequence("quotes_section_id_seq"), primary_key=True) + name: Mapped[str] = mapped_column(String, nullable=False) + params: Mapped[dict | None] = mapped_column(JSON) + load_dttm: Mapped[datetime] = mapped_column( + TIMESTAMP(timezone=False), + nullable=False, + server_default=func.current_timestamp(), + ) + + quotes: Mapped[list[Quote]] = relationship( + back_populates="section", + cascade="all, delete-orphan", + ) + + def __repr__(self) -> str: + return f"" + +src\dataloader\cruds\base.py - +"""Базовый класс для CRUD""" + +from typing import Generic, Protocol, TypeVar + +import sqlalchemy as sa +from sqlalchemy import exc +from sqlalchemy.ext.asyncio import AsyncSession + +from dataloader.cruds.exceptions import InvalidDataException + + +class BaseProtocol(Protocol): # pylint: disable=too-few-public-methods + """Протокол для таблиц с id""" + + id: int + + +ID = TypeVar("ID", bound=BaseProtocol) + + +class BaseCRUD(Generic[ID]): + """Базовый класс для CRUD""" + + _table: type[ID] + + def __init__(self, table: type[ID], session: AsyncSession, logger) -> None: + """ + Инициализация класса для взаимодействия с таблицей + + :param table: Таблица + :param session: Сессия + :param logger: Логгер + """ + self._table = table + self._session = session + self._logger = logger + + async def get(self, pk_id: ID) -> ID: + """ + Получение объекта по id + + :param pk_id: Значение primary key таблицы + :return: Объект + """ + return await self._session.get(self._table, pk_id) + + async def get_all(self) -> list[ID]: + """ + Получение всех объектов из таблицы. Возвращает список объектов. + + :return: Список объектов + """ + result = await self._session.scalars(sa.select(self._table)) + return list(result) + + async def create(self, obj: ID) -> ID: + """ + Добавление объекта в таблицу. + + Возвращает добавленный объект. + + :raise + InvalidDataException: Если в таблице есть объект с таким id или неверно указаны данные + :param obj: + :return: + """ + try: + self._session.add(obj) + await self._session.commit() + except exc.IntegrityError as e: + self._logger.error(f"IntegrityError: {e}") + await self._session.rollback() + raise InvalidDataException from e + await self._session.refresh(obj) + return obj + + async def update(self, obj: ID) -> None: + """Обновление объекта в таблице""" + + async def delete(self, obj: ID) -> None: + """Удаление объекта из таблицы""" + +src\dataloader\cruds\quotes\schemas.py - +"""Pydantic DTO for incoming SuperTenera payloads.""" + +from datetime import date +from typing import Any + +from pydantic import BaseModel, Field, field_validator + + +class QuoteValueIn(BaseModel): + dt: date + price_o: float | None = Field(None, alias="price_o") + price_c: float | None = Field(None, alias="price_c") + price_h: float | None = Field(None, alias="price_h") + price_l: float | None = Field(None, alias="price_l") + volume: float | None = None + unit: str | None = None + key: int | None = None + value_profit: float | None = None + value_base: float | None = None + value_max: float | None = None + value_min: float | None = None + value_chng: float | None = None + value_chng_prc: float | None = None + price_i: float | None = None + value_day: float | None = None + value_prc: float | None = None + value_weekly_prc: float | None = None + value_monthly_prc: float | None = None + value_ytd_prc: float | None = None + value_yoy_prc: float | None = None + value_last: float | None = None + value_previous: float | None = None + + +class QuoteIn(BaseModel): + section_name: str + name: str + params: dict[str, Any] | None = None + srce: str | None = None + ticker: str | None = None + update_func: str | None = None + values: list[QuoteValueIn] + + @field_validator("values") + def non_empty(cls, v) -> Any: + if not v: + raise ValueError("values list must not be empty") + return v + +src\dataloader\cruds\quotes\crud.py - +"""CRUD-helpers for quotes-related tables.""" + +from __future__ import annotations + +from collections.abc import Sequence +from datetime import datetime + +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from dataloader.context import APP_CTX +from dataloader.cruds import BaseCRUD, InvalidDataException +from dataloader.models import Quote, QuoteSection, QuoteValue + +logger = APP_CTX.get_logger() + + +class QuoteSectionCRUD(BaseCRUD[QuoteSection]): + """CRUD for QuoteSection.""" + + async def get_by_name(self, name: str) -> QuoteSection | None: + stmt = sa.select(QuoteSection).where(QuoteSection.name == name) + res = await self._session.scalar(stmt) + return res + + async def get_or_create(self, name: str, params: dict | None = None) -> QuoteSection: + existing = await self.get_by_name(name) + if existing: + return existing + + obj = QuoteSection(name=name, params=params or {}) + return await self.create(obj) + + +class QuoteCRUD(BaseCRUD[Quote]): + """CRUD for Quote with UPSERT-semantics.""" + + async def get_by_name( + self, + section_id: int, + name: str, + ) -> Quote | None: + stmt = sa.select(Quote).where( + Quote.quote_sect_id == section_id, + Quote.name == name, + ) + res = await self._session.scalar(stmt) + return res + + async def upsert( + self, + section: QuoteSection, + *, + name: str, + params: dict | None = None, + srce: str | None = None, + ticker: str | None = None, + ) -> Quote: + """Insert or update a quote and return the resulting row.""" + + try: + now = datetime.now(tz=APP_CTX.get_pytz_timezone()).replace(tzinfo=None) + + stmt = ( + pg_insert(Quote) + .values( + quote_sect_id=section.quote_sect_id, + name=name, + params=params, + srce=srce, + ticker=ticker, + last_update_dttm=now, + ) + .on_conflict_do_update( + index_elements=["quote_sect_id", "name"], + set_={ + "params": pg_insert(Quote).excluded.params, + "srce": pg_insert(Quote).excluded.srce, + "ticker": pg_insert(Quote).excluded.ticker, + "last_update_dttm": now, + }, + ) + .returning(Quote) + ) + + res = await self._session.scalar(stmt) + + if not res: + raise InvalidDataException("Failed to upsert quote") + await self._session.commit() + return res + except Exception as e: + await self._session.rollback() + raise e + + +class QuoteValueCRUD(BaseCRUD[QuoteValue]): + """CRUD for QuoteValue with bulk UPSERT.""" + + async def bulk_upsert( + self, + quote: Quote, + values: Sequence[dict], + ) -> None: + """Bulk insert / update values for a quote.""" + if not values: + return + + now = datetime.now(tz=APP_CTX.get_pytz_timezone()).replace(tzinfo=None) + quote_id = quote.quote_id + + update_columns = { + c.name + for c in QuoteValue.__table__.columns + if c.name not in {"quotes_values_id", "quote_id", "dt", "load_dttm"} + } + + payload = [ + { + "dt": item["dt"], + "quote_id": quote_id, + "load_dttm": now, + **{col: item.get(col) for col in update_columns}, + } + for item in values + ] + + insert_stmt = pg_insert(QuoteValue).values(payload) + + update_cols = {col: insert_stmt.excluded[col] for col in update_columns} + + stmt = insert_stmt.on_conflict_do_update( + index_elements=["quote_id", "dt"], + set_=update_cols, + ) + + await self._session.execute(stmt) + await self._session.commit() + + async def list_for_period( + self, + quote_id: int, + dt_from, + dt_to, + ) -> list[QuoteValue]: + stmt = ( + sa.select(QuoteValue) + .where( + QuoteValue.quote_id == quote_id, + QuoteValue.dt.between(dt_from, dt_to), + ) + .order_by(QuoteValue.dt) + ) + res = await self._session.scalars(stmt) + return list(res) + +src\dataloader\api\v1\service.py - +"""Бизнес-логика загрузки котировок из SuperTenera и сохранения в БД.""" + +from __future__ import annotations + +from datetime import date, datetime +from typing import Any + +from dataloader.context import APP_CTX +from dataloader.cruds.quotes.crud import QuoteCRUD, QuoteSectionCRUD, QuoteValueCRUD +from dataloader.interfaces.tenera.interface import get_async_tenera_interface +from dataloader.interfaces.tenera.schemas import ( + BloombergTimePoint, + CbrTimePoint, + InvestingCandlestick, + InvestingNumeric, + InvestingTimePoint, + SgxTimePoint, + TimePointUnion, + TradingEconomicsEmptyString, + TradingEconomicsLastPrev, + TradingEconomicsNumeric, + TradingEconomicsStringPercent, + TradingEconomicsStringTime, + TradingEconomicsTimePoint, + TradingViewTimePoint, +) +from dataloader.models import Quote, QuoteSection, QuoteValue + +logger = APP_CTX.get_logger() + + +def _to_float(value: str | int | float | None) -> float | None: + """Преобразует строковые числа с запятыми/процентами к float.""" + if value is None: + return None + if isinstance(value, int | float): + return float(value) + s = str(value).strip().replace(" ", "").replace("%", "").replace(",", ".") + if s == "": + return None + try: + return float(s) + except ValueError: + return None + + +def _parse_ts_to_datetime(ts: str) -> datetime | None: + """Преобразует строку с Unix timestamp в datetime без таймзоны, но в таймзоне приложения.""" + if not ts or not ts.strip().isdigit(): + return None + + try: + timestamp = int(ts.strip()) + dt_aware = datetime.fromtimestamp(timestamp, tz=APP_CTX.get_pytz_timezone()) + return dt_aware.replace(tzinfo=None) + except (ValueError, OSError, OverflowError): + return None + + +def _build_value_row(source: str, dt: date, point: Any) -> dict[str, Any] | None: # noqa: C901 + """Строит строку для `quotes_values` по источнику и типу точки.""" + if isinstance(point, int): + return {"dt": dt, "key": point} + + if isinstance(point, TimePointUnion): + inner = point.root + if isinstance(inner, InvestingTimePoint): + deep_inner = inner.root + if isinstance(deep_inner, InvestingNumeric): + return { + "dt": dt, + "value_profit": _to_float(deep_inner.profit), + "value_base": _to_float(deep_inner.base_value), + "value_max": _to_float(deep_inner.max_value), + "value_min": _to_float(deep_inner.min_value), + "value_chng": _to_float(deep_inner.change), + "value_chng_prc": _to_float(deep_inner.change_ptc), + } + + if isinstance(deep_inner, InvestingCandlestick): + return { + "dt": dt, + "price_o": _to_float(getattr(deep_inner, "open_", None) or getattr(deep_inner, "open", None)), + "price_h": _to_float(deep_inner.high), + "price_l": _to_float(deep_inner.low), + "price_c": _to_float(deep_inner.close), + "volume": _to_float(deep_inner.value), + } + + if isinstance(inner, TradingViewTimePoint | SgxTimePoint): + return { + "dt": dt, + "price_o": _to_float(getattr(inner, "open_", None) or getattr(inner, "open", None)), + "price_h": _to_float(inner.high), + "price_l": _to_float(inner.low), + "price_c": _to_float(inner.close), + "volume": _to_float( + getattr(inner, "volume", None) or getattr(inner, "interest", None) or getattr(inner, "value", None) + ), + } + + if isinstance(inner, BloombergTimePoint): + return { + "dt": dt, + "value_base": _to_float(inner.value), + } + + if isinstance(inner, CbrTimePoint): + return { + "dt": dt, + "value_base": _to_float(inner.value), + } + if isinstance(inner, TradingEconomicsTimePoint): + deep_inner = inner.root + + if isinstance(deep_inner, TradingEconomicsNumeric): + return { + "dt": dt, + "price_i": _to_float(deep_inner.price), + "value_day": _to_float(deep_inner.day), + "value_prc": _to_float(deep_inner.percent), + "value_weekly_prc": _to_float(deep_inner.weekly), + "value_monthly_prc": _to_float(deep_inner.monthly), + "value_ytd_prc": _to_float(deep_inner.ytd), + "value_yoy_prc": _to_float(deep_inner.yoy), + } + + if isinstance(deep_inner, TradingEconomicsLastPrev): + return { + "dt": dt, + "value_last": _to_float(deep_inner.last), + "value_previous": _to_float(deep_inner.previous), + "unit": str(deep_inner.unit) if deep_inner.unit is not None else None, + } + + if isinstance(deep_inner, TradingEconomicsStringPercent): + return { + "dt": dt, + "value_prc": _to_float(deep_inner.root), + } + + if isinstance(deep_inner, TradingEconomicsStringTime): + return None + + if isinstance(deep_inner, TradingEconomicsEmptyString): + return { + "dt": dt, + "is_empty_str_flg": True, + } + + return None + + +async def fetch_and_save_data() -> None: + """Загружает данные из SuperTenera и апсертом сохраняет их в БД.""" + logger.info("ETL start") + + async with get_async_tenera_interface() as tenera: + data = await tenera.get_quotes_data() + + async with APP_CTX.async_session_maker() as session: + section_crud = QuoteSectionCRUD(QuoteSection, session, logger) + quote_crud = QuoteCRUD(Quote, session, logger) + value_crud = QuoteValueCRUD(QuoteValue, session, logger) + + for source_name in ("cbr", "investing", "sgx", "tradingeconomics", "bloomberg", "trading_view"): + source_data = getattr(data, source_name) + if not source_data: + continue + + section = await section_crud.get_by_name(source_name) + if section is None: + logger.warning(f"Section ‘{source_name}’ not found. Skipping source.") + continue + + for instrument_name, instrument_data in source_data.items(): + quote = await quote_crud.upsert( + section=section, + name=instrument_name, + ) + rows: list[dict[str, Any]] = [] + for ts, tp in instrument_data.items(): + dt = _parse_ts_to_datetime(str(ts)) + if not dt: + continue + row = _build_value_row(source_name, dt, tp) + if row is None: + continue + rows.append(row) + + await value_crud.bulk_upsert(quote, rows) + + logger.info("ETL complete") + +src\dataloader\api\v1\router.py - +""" +Основное ядро API co всеми endpoint-ми. +""" + +from fastapi import APIRouter, BackgroundTasks + +from dataloader.context import APP_CTX + +from .service import fetch_and_save_data + +router = APIRouter() +logger = APP_CTX.get_logger() + + +@router.post("/trigger_data") +async def trigger_data(background_tasks: BackgroundTasks) -> dict[str, str]: + background_tasks.add_task(fetch_and_save_data) + return {"message": "ETL started"} + +и в конфиге - + +class SuperTenera(BaseAppSettings): + """ + Настройки интеграции с другими сервисами. + """ + + host: Annotated[str, BeforeValidator(strip_slashes)] = Field( + validation_alias="SUPERTENERA_HOST", default="ci03801737-ift-tenera-giga.delta.sbrf.ru/atlant360bc/" + ) + port: str = Field(validation_alias="SUPERTENERA_PORT", default="443") + + quotes_endpoint: Annotated[str, BeforeValidator(strip_slashes)] = Field( + validation_alias="SUPERTENERA_QUOTES_ENDPOINT", default="/get_gigaparser_quotes/" + ) + timeout: int = 20 + + @property + def base_url(self) -> str: + """Возвращает абсолютный URL""" + domain, raw_path = self.host.split("/", 1) if "/" in self.host else (self.host, "") + return build_url(self.protocol, domain, self.port, raw_path)