dataloader/PIPELINE_TENERA.md

1202 lines
40 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

Это вырезка файлов из ETL сервиса старого. Оно работало, но местами было не совсем оптимально.
нужно сделать из этого файла полноценный ETL задачу для нашего сервиса, раскидать куда надо интерфейс, модели и т.п и зарегистрировать задачу.
src\dataloader\interfaces\tenera\interface.py -
"""Интерфейс для взаимодействия с сервисом SuperTenera.
Позволяет запросить актуальные данные по котировкам с SuperTenera в json.
"""
import json
from pathlib import Path
import ssl
import uuid
from asyncio import TimeoutError
from datetime import datetime
from typing import Any, Literal, Self
import aiohttp
from dataloader.config import APP_CONFIG
from dataloader.interfaces.tenera.schemas import MainData
from dataloader.interfaces.utils import log_method_call
class SuperTeneraConnectionError(Exception):
"""Ошибка подключения к SuperTenera API."""
class SuperTeneraInterface:
"""Интерфейс для взаимодействия с сервисом SuperTenera по http."""
def __init__(
self,
logger,
base_url: str,
*,
timezone=None,
) -> None:
"""
Constructor.
Args:
logger: Logger instance.
base_url: base service url.
timezone: timezone for datetime objects.
"""
self.logger = logger
self.base_url = base_url
self.timezone = timezone
self._session: aiohttp.ClientSession | None = None
self._ssl_context = None
if APP_CONFIG.app.local:
self._ssl_context = ssl.create_default_context(cafile=APP_CONFIG.certs.ca_bundle_file)
self._ssl_context.load_cert_chain(certfile=APP_CONFIG.certs.cert_file, keyfile=APP_CONFIG.certs.key_file)
def form_base_headers(self) -> dict:
"""Form metadata for call."""
metadata_pairs = {
"request-id": str(uuid.uuid4()),
"request-time": str(datetime.now(tz=self.timezone).isoformat()),
"system-id": APP_CONFIG.app.system_id,
}
return {metakey: metavalue for metakey, metavalue in metadata_pairs.items() if metavalue}
async def __aenter__(self) -> Self:
"""Async context manager enter."""
self._session = aiohttp.ClientSession(
base_url=self.base_url,
connector=aiohttp.TCPConnector(limit=100),
headers=self.form_base_headers(),
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit."""
if exc_val is not None:
self.logger.error(f"{exc_type}: {exc_val}")
await self._session.close()
async def _get_request(
self,
url: str,
encoding: str | None = None,
content_type: str | None = "application/json",
**kwargs,
) -> Any:
"""Get realization."""
kwargs["ssl"] = self._ssl_context
async with self._session.get(url, **kwargs) as response:
if APP_CONFIG.app.debug:
self.logger.debug(f"Response: {(await response.text(errors='ignore'))[:100]}")
return await response.json(encoding=encoding, content_type=content_type)
@log_method_call
async def get_quotes_data(self) -> MainData:
"""Получить данные котировок от SuperTenera"""
data = await self._get_request(APP_CONFIG.supertenera.quotes_endpoint)
# mock_path = Path(__file__).parent / "supertenera_response.json"
# with open(mock_path, "r", encoding="utf-8") as f:
# data = json.load(f)
return MainData.model_validate(data)
@log_method_call
async def ping(self, **kwargs) -> Literal[True]:
"""
Быстрая проверка доступности SuperTenera API.
True - если ответ < 400, иначе SuperTeneraConnectionError.
"""
kwargs["ssl"] = self._ssl_context
try:
async with self._session.get(
APP_CONFIG.supertenera.quotes_endpoint, timeout=APP_CONFIG.supertenera.timeout, **kwargs
) as resp:
resp.raise_for_status()
return True
except aiohttp.ClientResponseError as e:
raise SuperTeneraConnectionError(
f"Ошибка подключения к SuperTenera API при проверке системы - {e.status}."
) from e
except TimeoutError as e:
raise SuperTeneraConnectionError(
f"Ошибка Timeout подключения к SuperTenera API при проверке системы."
) from e
def get_async_tenera_interface() -> SuperTeneraInterface:
"""Get SuperTenera instance."""
from dataloader.context import APP_CTX
return SuperTeneraInterface(
logger=APP_CTX.get_logger(),
base_url=APP_CTX.get_tenera_base_url(),
timezone=APP_CTX.get_pytz_timezone(),
)
src\dataloader\interfaces\tenera\schemas.py -
"""Схемы ответов"""
from __future__ import annotations
import re
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field, RootModel, field_validator
class TeneraBaseModel(BaseModel):
"""Базовая модель для всех схем SuperTenera с настройкой populate_by_name."""
model_config = ConfigDict(
populate_by_name=True,
extra="ignore",
)
# --- TimePoint models ---
class EmptyTimePoint(TeneraBaseModel):
"""
Модель заглушка для полностью пустых значений в точке времени.
Позволяет корректно валидировать случаи, когда JSON поле {} без содержимого.
"""
pass # pylint: disable=unnecessary-pass
class CbrTimePoint(TeneraBaseModel):
"""
Структура данных точки времени для источника Центрального банка России (ЦБР).
Поля:
- value: Строка с числовым значением ("80,32")
"""
value: str
class InvestingNumeric(TeneraBaseModel):
"""
Структура данных точки времени для источника Investing.com в формате по странам.
Поля (alias на русском):
- profit: Доходность
- base_value: Базовое
- max_value: Максимальное
- min_value: Минимальное
- change: Изменение
- change_ptc: Процент изменений
"""
profit: str = Field(alias="Доходность")
base_value: str = Field(alias="Осн.")
max_value: str = Field(alias="Макс.")
min_value: str = Field(alias="Мин.")
change: str = Field(alias="Изм.")
change_ptc: str = Field(alias="Изм. %")
class InvestingCandlestick(TeneraBaseModel):
"""
Структура данных точки времени для источника Investing.com в формате свечи.
Поля (alias латинскими заглавными буквами):
- open_: "O"
- high: "H"
- low: "L"
- close: "C"
- interest: "I" | None
- value: "V"
"""
open_: str = Field(alias="O")
high: str = Field(alias="H")
low: str = Field(alias="L")
close: str = Field(alias="C")
interest: str | None = Field(alias="I")
value: str = Field(alias="V")
class InvestingTimePoint(RootModel[EmptyTimePoint | InvestingNumeric | InvestingCandlestick]):
"""
Union-модель точки времени для источника Investing.com.
1) {} -> EmptyTImePoint
2) numeric -> InvestingNumeric
3) свечной -> InvestingCandlestick
"""
pass # pylint: disable=unnecessary-pass
class SgxTimePoint(TeneraBaseModel):
"""
Структура данных точки времени для источника Сингапурской биржи (SGX).
Поля (alias латинскими заглавными буквами):
- open_: "O"
- high: "H"
- low: "L"
- close: "C"
- interest: "I"
- value: "V"
"""
open_: str | None = Field(alias="O")
high: str | None = Field(alias="H")
low: str | None = Field(alias="L")
close: str | None = Field(alias="C")
interest: str | None = Field(alias="I")
value: str | None = Field(alias="V")
class TradingEconomicsEmptyString(RootModel[str]):
"""
Валидирует точно пустую строку ("").
Используется для точек данных TradingEconomics, содержащих пустые строковые значения.
Поля:
- root: Строка, которая должна быть точно пустой ("")
"""
root: str
@field_validator("root", mode="before")
@classmethod
def _must_be_empty(cls, v) -> Literal[""]:
if v == "":
return v
raise ValueError("not an empty string")
class TradingEconomicsStringPercent(RootModel[str]):
"""
Валидирует строки-проценты вида "3.1%" или "-0,14%".
Принимает как запятую, так и точку в качестве десятичного разделителя.
Шаблон: опциональный минус, цифры, опциональная десятичная часть, знак процента.
Поля:
root: Строка с процентным значением (например: "3.1%", "-0,14%", "15%")
"""
root: str
@field_validator("root")
@classmethod
def _check_percent(cls, v) -> str:
if isinstance(v, str) and re.match(r"^-?\d+(?:[.,]\d+)?%$", v):
return v
raise ValueError(f"invalid percent string: {v!r}")
class TradingEconomicsStringTime(RootModel[str]):
"""
Валидирует строки времени в формате "h:mm AM/PM".
Примеры: "01:15 AM", "12:30 PM", "9:45 AM"
Поля:
root: Строка времени в 12-часовом формате с AM/PM
"""
root: str
@field_validator("root")
@classmethod
def _check_time(cls, v) -> str:
if isinstance(v, str) and re.match(r"^(0?[1-9]|1[0-2]):[0-5]\d\s[AP]M$", v):
return v
raise ValueError(f"invalid time string: {v!r}")
class TradingEconomicsNumeric(TeneraBaseModel):
"""
Полный числовой формат данных от TradingEconomics.
Содержит полную рыночную информацию с ценой, дневным изменением, процентами
и различными периодическими изменениями (недельными, месячными, с начала года, год к году).
Поля:
price: Текущая цена инструмента (алиас: "Price")
day: Дневное изменение в абсолютных значениях (алиас: "Day")
percent: Дневное изменение в процентах (алиас: "%")
weekly: Недельное изменение (алиас: "Weekly")
monthly: Месячное изменение (алиас: "Monthly")
ytd: Изменение с начала года (алиас: "YTD")
yoy: Изменение год к году (алиас: "YoY")
"""
price: str = Field(alias="Price")
day: str = Field(alias="Day")
percent: str = Field(alias="%")
weekly: str = Field(alias="Weekly")
monthly: str = Field(alias="Monthly")
ytd: str = Field(alias="YTD")
yoy: str = Field(alias="YoY")
class TradingEconomicsLastPrev(TeneraBaseModel):
"""
Формат Last/Previous/Unit от TradingEconomics.
Содержит текущее значение, предыдущее значение и единицу измерения.
Обычно используется для экономических индикаторов и статистики.
Поля:
last: Последнее (текущее) значение показателя (алиас: "Last")
previous: Предыдущее значение показателя (алиас: "Previous")
unit: Единица измерения показателя (алиас: "Unit")
"""
last: str = Field(alias="Last")
previous: str = Field(alias="Previous")
unit: str = Field(alias="Unit")
class TradingEconomicsTimePoint(
RootModel[
EmptyTimePoint
| TradingEconomicsEmptyString
| TradingEconomicsStringPercent
| TradingEconomicsStringTime
| TradingEconomicsNumeric
| TradingEconomicsLastPrev
]
):
"""
Объединение всех возможных форматов точек времени TradingEconomics.
Поддерживает:
- Пустые объекты ({})
- Пустые строки ("")
- Строки-проценты ("3.1%", "-0,14%")
- Строки времени ("01:15 AM")
- Полные числовые объекты с полями цена/день/%
- Объекты Last/Previous/Unit для экономических индикаторов
Поля:
root: Один из поддерживаемых типов точек времени TradingEconomics
"""
pass # pylint: disable=unnecessary-pass
class BloombergTimePoint(TeneraBaseModel):
"""
Структура данных точки времени для источника Bloomberg.
Поля:
- value: Строка с числовым значением ("80,32")
"""
value: str
class TradingViewTimePoint(TeneraBaseModel):
"""
Структура данных точки времени для источника TradingView.
Поля (alias латинскими заглавными буквами):
- open_: "O"
- high: "H"
- low: "L"
- close: "C"
- volume: "Vol"
"""
open_: str | None = Field(alias="O")
high: str | None = Field(alias="H")
low: str | None = Field(alias="L")
close: str | None = Field(alias="C")
volume: str | None = Field(alias="Vol")
class TimePointUnion(
RootModel[
EmptyTimePoint
| CbrTimePoint
| InvestingTimePoint
| SgxTimePoint
| TradingEconomicsTimePoint
| BloombergTimePoint
| TradingViewTimePoint
]
):
"""
Универсальное объединение для точек времени от всех поддерживаемых источников данных.
Обрабатывает структуры данных от:
- ЦБР (Центральный банк России)
- Investing.com
- SGX (Сингапурская биржа)
- TradingEconomics
- Bloomberg
- TradingView
Поля:
root: Точка времени от любого из поддерживаемых источников данных
"""
pass # pylint: disable=unnecessary-pass
InstrumentData = dict[str | int, TimePointUnion]
"""Тип: Отображение timestamp -> TimePointUnion."""
SourceData = dict[str, InstrumentData]
"""Тип: Отображение имени инструмента -> InstrumentData."""
class MainData(TeneraBaseModel):
"""
Основной контейнер данных для всех источников финансовых данных от SuperTenera.
Содержит опциональные данные от нескольких поставщиков финансовых данных,
структурированные по источникам, а затем по инструментам.
Поля:
cbr: Данные от Центрального банка России (опционально)
investing: Данные от Investing.com (опционально)
sgx: Данные от Сингапурской биржи (опционально)
tradingeconomics: Данные от TradingEconomics (опционально)
bloomberg: Данные от Bloomberg (опционально)
trading_view: Данные от TradingView (опционально, алиас: "trading_view")
"""
cbr: SourceData | None = None
investing: SourceData | None = None
sgx: SourceData | None = None
tradingeconomics: SourceData | None = None
bloomberg: SourceData | None = None
trading_view: SourceData | None = Field(default=None, alias="trading_view")
@field_validator("investing", mode="before")
@classmethod
def _filter_investing(cls, v) -> SourceData | None:
"""
Фильтрация данных от Investing.com.
Убираем:
- все ключи, у которых значение null
- все ключи, которые выглядят как чистые числа (timestamps)
:param v: Объект с данными от Investing.com
:return: Отфильтрованный объект
"""
if isinstance(v, dict):
return {key: value for key, value in v.items() if value is not None and not str(key).isdigit()}
return v
src\dataloader\interfaces\tenera\__init__.py -
from . import schemas
from .interface import SuperTeneraInterface, get_async_tenera_interface
__all__ = [
"schemas",
"SuperTeneraInterface",
"get_async_tenera_interface",
]
src\dataloader\models\quote.py -
"""Quote модель."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from sqlalchemy import JSON, TIMESTAMP, BigInteger, ForeignKey, String, UniqueConstraint, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from dataloader.base import Base
if TYPE_CHECKING:
from .quote_section import QuoteSection
from .quote_value import QuoteValue
class Quote(Base):
"""Представляет custom_cib_quotes.quotes."""
__tablename__ = "quotes"
__table_args__ = (UniqueConstraint("quote_sect_id", "name", name="ak_uq_quote_name_and_quotes"),)
quote_id: Mapped[int] = mapped_column(BigInteger(), primary_key=True)
name: Mapped[str] = mapped_column(String, nullable=False)
params: Mapped[dict | None] = mapped_column(JSON)
srce: Mapped[str | None] = mapped_column(String)
ticker: Mapped[str | None] = mapped_column(String)
quote_sect_id: Mapped[int] = mapped_column(
ForeignKey("quotes_sect.quote_sect_id", ondelete="CASCADE", onupdate="CASCADE"),
nullable=False,
)
last_update_dttm: Mapped[datetime | None] = mapped_column(TIMESTAMP(timezone=True))
load_dttm: Mapped[datetime] = mapped_column(
TIMESTAMP(timezone=False),
nullable=False,
server_default=func.current_timestamp(),
)
section: Mapped[QuoteSection] = relationship(back_populates="quotes")
values: Mapped[list[QuoteValue]] = relationship(
back_populates="quote",
cascade="all, delete-orphan",
)
def __repr__(self) -> str:
return f"<Quote id={self.quote_id} name={self.name}>"
src\dataloader\models\quote_value.py -
"""Quote-value модель."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from sqlalchemy import TIMESTAMP, BigInteger, DateTime, Float, ForeignKey, String, UniqueConstraint, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from dataloader.base import Base
if TYPE_CHECKING:
from .quote import Quote
class QuoteValue(Base):
"""Представляет custom_cib_quotes.quotes_values."""
__tablename__ = "quotes_values"
__table_args__ = (UniqueConstraint("quote_id", "dt", name="ak_uq_quote_and_date_quotes"),)
quotes_values_id: Mapped[int] = mapped_column(
BigInteger(),
primary_key=True,
autoincrement=True,
)
quote_id: Mapped[int] = mapped_column(
ForeignKey("quotes.quote_id", ondelete="RESTRICT", onupdate="RESTRICT"),
nullable=False,
)
dt: Mapped[datetime] = mapped_column(DateTime, nullable=False)
price_o: Mapped[float | None] = mapped_column(Float)
price_c: Mapped[float | None] = mapped_column(Float)
price_h: Mapped[float | None] = mapped_column(Float)
price_l: Mapped[float | None] = mapped_column(Float)
volume: Mapped[float | None] = mapped_column(Float)
load_dttm: Mapped[datetime] = mapped_column(
TIMESTAMP(timezone=False),
nullable=False,
server_default=func.current_timestamp(),
)
unit: Mapped[str | None] = mapped_column(String)
key: Mapped[int | None] = mapped_column(BigInteger())
value_profit: Mapped[float | None] = mapped_column(Float)
value_base: Mapped[float | None] = mapped_column(Float)
value_max: Mapped[float | None] = mapped_column(Float)
value_min: Mapped[float | None] = mapped_column(Float)
value_chng: Mapped[float | None] = mapped_column(Float)
value_chng_prc: Mapped[float | None] = mapped_column(Float)
price_i: Mapped[float | None] = mapped_column(Float)
price: Mapped[float | None] = mapped_column(Float)
value_day: Mapped[float | None] = mapped_column(Float)
value_prc: Mapped[float | None] = mapped_column(Float)
value_weekly_prc: Mapped[float | None] = mapped_column(Float)
value_monthly_prc: Mapped[float | None] = mapped_column(Float)
value_ytd_prc: Mapped[float | None] = mapped_column(Float)
value_yoy_prc: Mapped[float | None] = mapped_column(Float)
value_last: Mapped[float | None] = mapped_column(Float)
value_previous: Mapped[float | None] = mapped_column(Float)
is_empty_str_flg: Mapped[bool | None] = mapped_column()
interest: Mapped[float | None] = mapped_column(Float)
quote: Mapped[Quote] = relationship(back_populates="values")
def __repr__(self) -> str:
return f"<QuoteValue id={self.quotes_values_id} dt={self.dt}>"
src\dataloader\models\quote_section.py -
"""Quote-section модель."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from sqlalchemy import JSON, TIMESTAMP, Integer, Sequence, String, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from dataloader.base import Base
if TYPE_CHECKING:
from .quote import Quote
class QuoteSection(Base):
"""Представляет custom_cib_quotes.quotes_sect."""
__tablename__ = "quotes_sect"
quote_sect_id: Mapped[int] = mapped_column(Integer(), Sequence("quotes_section_id_seq"), primary_key=True)
name: Mapped[str] = mapped_column(String, nullable=False)
params: Mapped[dict | None] = mapped_column(JSON)
load_dttm: Mapped[datetime] = mapped_column(
TIMESTAMP(timezone=False),
nullable=False,
server_default=func.current_timestamp(),
)
quotes: Mapped[list[Quote]] = relationship(
back_populates="section",
cascade="all, delete-orphan",
)
def __repr__(self) -> str:
return f"<QuoteSection id={self.quote_sect_id} name={self.name}>"
src\dataloader\cruds\base.py -
"""Базовый класс для CRUD"""
from typing import Generic, Protocol, TypeVar
import sqlalchemy as sa
from sqlalchemy import exc
from sqlalchemy.ext.asyncio import AsyncSession
from dataloader.cruds.exceptions import InvalidDataException
class BaseProtocol(Protocol): # pylint: disable=too-few-public-methods
"""Протокол для таблиц с id"""
id: int
ID = TypeVar("ID", bound=BaseProtocol)
class BaseCRUD(Generic[ID]):
"""Базовый класс для CRUD"""
_table: type[ID]
def __init__(self, table: type[ID], session: AsyncSession, logger) -> None:
"""
Инициализация класса для взаимодействия с таблицей
:param table: Таблица
:param session: Сессия
:param logger: Логгер
"""
self._table = table
self._session = session
self._logger = logger
async def get(self, pk_id: ID) -> ID:
"""
Получение объекта по id
:param pk_id: Значение primary key таблицы
:return: Объект
"""
return await self._session.get(self._table, pk_id)
async def get_all(self) -> list[ID]:
"""
Получение всех объектов из таблицы. Возвращает список объектов.
:return: Список объектов
"""
result = await self._session.scalars(sa.select(self._table))
return list(result)
async def create(self, obj: ID) -> ID:
"""
Добавление объекта в таблицу.
Возвращает добавленный объект.
:raise
InvalidDataException: Если в таблице есть объект с таким id или неверно указаны данные
:param obj:
:return:
"""
try:
self._session.add(obj)
await self._session.commit()
except exc.IntegrityError as e:
self._logger.error(f"IntegrityError: {e}")
await self._session.rollback()
raise InvalidDataException from e
await self._session.refresh(obj)
return obj
async def update(self, obj: ID) -> None:
"""Обновление объекта в таблице"""
async def delete(self, obj: ID) -> None:
"""Удаление объекта из таблицы"""
src\dataloader\cruds\quotes\schemas.py -
"""Pydantic DTO for incoming SuperTenera payloads."""
from datetime import date
from typing import Any
from pydantic import BaseModel, Field, field_validator
class QuoteValueIn(BaseModel):
dt: date
price_o: float | None = Field(None, alias="price_o")
price_c: float | None = Field(None, alias="price_c")
price_h: float | None = Field(None, alias="price_h")
price_l: float | None = Field(None, alias="price_l")
volume: float | None = None
unit: str | None = None
key: int | None = None
value_profit: float | None = None
value_base: float | None = None
value_max: float | None = None
value_min: float | None = None
value_chng: float | None = None
value_chng_prc: float | None = None
price_i: float | None = None
value_day: float | None = None
value_prc: float | None = None
value_weekly_prc: float | None = None
value_monthly_prc: float | None = None
value_ytd_prc: float | None = None
value_yoy_prc: float | None = None
value_last: float | None = None
value_previous: float | None = None
class QuoteIn(BaseModel):
section_name: str
name: str
params: dict[str, Any] | None = None
srce: str | None = None
ticker: str | None = None
update_func: str | None = None
values: list[QuoteValueIn]
@field_validator("values")
def non_empty(cls, v) -> Any:
if not v:
raise ValueError("values list must not be empty")
return v
src\dataloader\cruds\quotes\crud.py -
"""CRUD-helpers for quotes-related tables."""
from __future__ import annotations
from collections.abc import Sequence
from datetime import datetime
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import insert as pg_insert
from dataloader.context import APP_CTX
from dataloader.cruds import BaseCRUD, InvalidDataException
from dataloader.models import Quote, QuoteSection, QuoteValue
logger = APP_CTX.get_logger()
class QuoteSectionCRUD(BaseCRUD[QuoteSection]):
"""CRUD for QuoteSection."""
async def get_by_name(self, name: str) -> QuoteSection | None:
stmt = sa.select(QuoteSection).where(QuoteSection.name == name)
res = await self._session.scalar(stmt)
return res
async def get_or_create(self, name: str, params: dict | None = None) -> QuoteSection:
existing = await self.get_by_name(name)
if existing:
return existing
obj = QuoteSection(name=name, params=params or {})
return await self.create(obj)
class QuoteCRUD(BaseCRUD[Quote]):
"""CRUD for Quote with UPSERT-semantics."""
async def get_by_name(
self,
section_id: int,
name: str,
) -> Quote | None:
stmt = sa.select(Quote).where(
Quote.quote_sect_id == section_id,
Quote.name == name,
)
res = await self._session.scalar(stmt)
return res
async def upsert(
self,
section: QuoteSection,
*,
name: str,
params: dict | None = None,
srce: str | None = None,
ticker: str | None = None,
) -> Quote:
"""Insert or update a quote and return the resulting row."""
try:
now = datetime.now(tz=APP_CTX.get_pytz_timezone()).replace(tzinfo=None)
stmt = (
pg_insert(Quote)
.values(
quote_sect_id=section.quote_sect_id,
name=name,
params=params,
srce=srce,
ticker=ticker,
last_update_dttm=now,
)
.on_conflict_do_update(
index_elements=["quote_sect_id", "name"],
set_={
"params": pg_insert(Quote).excluded.params,
"srce": pg_insert(Quote).excluded.srce,
"ticker": pg_insert(Quote).excluded.ticker,
"last_update_dttm": now,
},
)
.returning(Quote)
)
res = await self._session.scalar(stmt)
if not res:
raise InvalidDataException("Failed to upsert quote")
await self._session.commit()
return res
except Exception as e:
await self._session.rollback()
raise e
class QuoteValueCRUD(BaseCRUD[QuoteValue]):
"""CRUD for QuoteValue with bulk UPSERT."""
async def bulk_upsert(
self,
quote: Quote,
values: Sequence[dict],
) -> None:
"""Bulk insert / update values for a quote."""
if not values:
return
now = datetime.now(tz=APP_CTX.get_pytz_timezone()).replace(tzinfo=None)
quote_id = quote.quote_id
update_columns = {
c.name
for c in QuoteValue.__table__.columns
if c.name not in {"quotes_values_id", "quote_id", "dt", "load_dttm"}
}
payload = [
{
"dt": item["dt"],
"quote_id": quote_id,
"load_dttm": now,
**{col: item.get(col) for col in update_columns},
}
for item in values
]
insert_stmt = pg_insert(QuoteValue).values(payload)
update_cols = {col: insert_stmt.excluded[col] for col in update_columns}
stmt = insert_stmt.on_conflict_do_update(
index_elements=["quote_id", "dt"],
set_=update_cols,
)
await self._session.execute(stmt)
await self._session.commit()
async def list_for_period(
self,
quote_id: int,
dt_from,
dt_to,
) -> list[QuoteValue]:
stmt = (
sa.select(QuoteValue)
.where(
QuoteValue.quote_id == quote_id,
QuoteValue.dt.between(dt_from, dt_to),
)
.order_by(QuoteValue.dt)
)
res = await self._session.scalars(stmt)
return list(res)
src\dataloader\api\v1\service.py -
"""Бизнес-логика загрузки котировок из SuperTenera и сохранения в БД."""
from __future__ import annotations
from datetime import date, datetime
from typing import Any
from dataloader.context import APP_CTX
from dataloader.cruds.quotes.crud import QuoteCRUD, QuoteSectionCRUD, QuoteValueCRUD
from dataloader.interfaces.tenera.interface import get_async_tenera_interface
from dataloader.interfaces.tenera.schemas import (
BloombergTimePoint,
CbrTimePoint,
InvestingCandlestick,
InvestingNumeric,
InvestingTimePoint,
SgxTimePoint,
TimePointUnion,
TradingEconomicsEmptyString,
TradingEconomicsLastPrev,
TradingEconomicsNumeric,
TradingEconomicsStringPercent,
TradingEconomicsStringTime,
TradingEconomicsTimePoint,
TradingViewTimePoint,
)
from dataloader.models import Quote, QuoteSection, QuoteValue
logger = APP_CTX.get_logger()
def _to_float(value: str | int | float | None) -> float | None:
"""Преобразует строковые числа с запятыми/процентами к float."""
if value is None:
return None
if isinstance(value, int | float):
return float(value)
s = str(value).strip().replace(" ", "").replace("%", "").replace(",", ".")
if s == "":
return None
try:
return float(s)
except ValueError:
return None
def _parse_ts_to_datetime(ts: str) -> datetime | None:
"""Преобразует строку с Unix timestamp в datetime без таймзоны, но в таймзоне приложения."""
if not ts or not ts.strip().isdigit():
return None
try:
timestamp = int(ts.strip())
dt_aware = datetime.fromtimestamp(timestamp, tz=APP_CTX.get_pytz_timezone())
return dt_aware.replace(tzinfo=None)
except (ValueError, OSError, OverflowError):
return None
def _build_value_row(source: str, dt: date, point: Any) -> dict[str, Any] | None: # noqa: C901
"""Строит строку для `quotes_values` по источнику и типу точки."""
if isinstance(point, int):
return {"dt": dt, "key": point}
if isinstance(point, TimePointUnion):
inner = point.root
if isinstance(inner, InvestingTimePoint):
deep_inner = inner.root
if isinstance(deep_inner, InvestingNumeric):
return {
"dt": dt,
"value_profit": _to_float(deep_inner.profit),
"value_base": _to_float(deep_inner.base_value),
"value_max": _to_float(deep_inner.max_value),
"value_min": _to_float(deep_inner.min_value),
"value_chng": _to_float(deep_inner.change),
"value_chng_prc": _to_float(deep_inner.change_ptc),
}
if isinstance(deep_inner, InvestingCandlestick):
return {
"dt": dt,
"price_o": _to_float(getattr(deep_inner, "open_", None) or getattr(deep_inner, "open", None)),
"price_h": _to_float(deep_inner.high),
"price_l": _to_float(deep_inner.low),
"price_c": _to_float(deep_inner.close),
"volume": _to_float(deep_inner.value),
}
if isinstance(inner, TradingViewTimePoint | SgxTimePoint):
return {
"dt": dt,
"price_o": _to_float(getattr(inner, "open_", None) or getattr(inner, "open", None)),
"price_h": _to_float(inner.high),
"price_l": _to_float(inner.low),
"price_c": _to_float(inner.close),
"volume": _to_float(
getattr(inner, "volume", None) or getattr(inner, "interest", None) or getattr(inner, "value", None)
),
}
if isinstance(inner, BloombergTimePoint):
return {
"dt": dt,
"value_base": _to_float(inner.value),
}
if isinstance(inner, CbrTimePoint):
return {
"dt": dt,
"value_base": _to_float(inner.value),
}
if isinstance(inner, TradingEconomicsTimePoint):
deep_inner = inner.root
if isinstance(deep_inner, TradingEconomicsNumeric):
return {
"dt": dt,
"price_i": _to_float(deep_inner.price),
"value_day": _to_float(deep_inner.day),
"value_prc": _to_float(deep_inner.percent),
"value_weekly_prc": _to_float(deep_inner.weekly),
"value_monthly_prc": _to_float(deep_inner.monthly),
"value_ytd_prc": _to_float(deep_inner.ytd),
"value_yoy_prc": _to_float(deep_inner.yoy),
}
if isinstance(deep_inner, TradingEconomicsLastPrev):
return {
"dt": dt,
"value_last": _to_float(deep_inner.last),
"value_previous": _to_float(deep_inner.previous),
"unit": str(deep_inner.unit) if deep_inner.unit is not None else None,
}
if isinstance(deep_inner, TradingEconomicsStringPercent):
return {
"dt": dt,
"value_prc": _to_float(deep_inner.root),
}
if isinstance(deep_inner, TradingEconomicsStringTime):
return None
if isinstance(deep_inner, TradingEconomicsEmptyString):
return {
"dt": dt,
"is_empty_str_flg": True,
}
return None
async def fetch_and_save_data() -> None:
"""Загружает данные из SuperTenera и апсертом сохраняет их в БД."""
logger.info("ETL start")
async with get_async_tenera_interface() as tenera:
data = await tenera.get_quotes_data()
async with APP_CTX.async_session_maker() as session:
section_crud = QuoteSectionCRUD(QuoteSection, session, logger)
quote_crud = QuoteCRUD(Quote, session, logger)
value_crud = QuoteValueCRUD(QuoteValue, session, logger)
for source_name in ("cbr", "investing", "sgx", "tradingeconomics", "bloomberg", "trading_view"):
source_data = getattr(data, source_name)
if not source_data:
continue
section = await section_crud.get_by_name(source_name)
if section is None:
logger.warning(f"Section {source_name} not found. Skipping source.")
continue
for instrument_name, instrument_data in source_data.items():
quote = await quote_crud.upsert(
section=section,
name=instrument_name,
)
rows: list[dict[str, Any]] = []
for ts, tp in instrument_data.items():
dt = _parse_ts_to_datetime(str(ts))
if not dt:
continue
row = _build_value_row(source_name, dt, tp)
if row is None:
continue
rows.append(row)
await value_crud.bulk_upsert(quote, rows)
logger.info("ETL complete")
src\dataloader\api\v1\router.py -
"""
Основное ядро API co всеми endpoint-ми.
"""
from fastapi import APIRouter, BackgroundTasks
from dataloader.context import APP_CTX
from .service import fetch_and_save_data
router = APIRouter()
logger = APP_CTX.get_logger()
@router.post("/trigger_data")
async def trigger_data(background_tasks: BackgroundTasks) -> dict[str, str]:
background_tasks.add_task(fetch_and_save_data)
return {"message": "ETL started"}
и в конфиге -
class SuperTenera(BaseAppSettings):
"""
Настройки интеграции с другими сервисами.
"""
host: Annotated[str, BeforeValidator(strip_slashes)] = Field(
validation_alias="SUPERTENERA_HOST", default="ci03801737-ift-tenera-giga.delta.sbrf.ru/atlant360bc/"
)
port: str = Field(validation_alias="SUPERTENERA_PORT", default="443")
quotes_endpoint: Annotated[str, BeforeValidator(strip_slashes)] = Field(
validation_alias="SUPERTENERA_QUOTES_ENDPOINT", default="/get_gigaparser_quotes/"
)
timeout: int = 20
@property
def base_url(self) -> str:
"""Возвращает абсолютный URL"""
domain, raw_path = self.host.split("/", 1) if "/" in self.host else (self.host, "")
return build_url(self.protocol, domain, self.port, raw_path)