dataloader/tests/unit/test_pipeline_load_tenera.py

537 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Unit тесты для пайплайна load_tenera."""
from __future__ import annotations
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import pytz
from dataloader.interfaces.tenera.schemas import (
BloombergTimePoint,
CbrTimePoint,
InvestingCandlestick,
InvestingNumeric,
InvestingTimePoint,
MainData,
SgxTimePoint,
TimePointUnion,
TradingEconomicsEmptyString,
TradingEconomicsLastPrev,
TradingEconomicsNumeric,
TradingEconomicsStringPercent,
TradingEconomicsStringTime,
TradingEconomicsTimePoint,
TradingViewTimePoint,
)
from dataloader.workers.pipelines.load_tenera import (
_build_value_row,
_parse_ts_to_datetime,
_process_source,
_to_float,
load_tenera,
)
@pytest.mark.unit
class TestToFloat:
"""Тесты для функции _to_float."""
def test_converts_int_to_float(self):
"""Тест конвертации int в float."""
assert _to_float(42) == 42.0
def test_converts_float_to_float(self):
"""Тест что float остается float."""
assert _to_float(3.14) == 3.14
def test_converts_string_number_to_float(self):
"""Тест конвертации строки с числом."""
assert _to_float("123.45") == 123.45
def test_converts_string_with_comma_to_float(self):
"""Тест конвертации строки с запятой."""
assert _to_float("123,45") == 123.45
def test_converts_string_with_percent_to_float(self):
"""Тест конвертации строки с процентом."""
assert _to_float("12.5%") == 12.5
def test_converts_string_with_spaces_to_float(self):
"""Тест конвертации строки с пробелами."""
assert _to_float(" 123.45 ") == 123.45
def test_returns_none_for_none(self):
"""Тест что None возвращает None."""
assert _to_float(None) is None
def test_returns_none_for_empty_string(self):
"""Тест что пустая строка возвращает None."""
assert _to_float("") is None
assert _to_float(" ") is None
def test_returns_none_for_invalid_string(self):
"""Тест что невалидная строка возвращает None."""
assert _to_float("invalid") is None
assert _to_float("abc123") is None
@pytest.mark.unit
class TestParseTsToDatetime:
"""Тесты для функции _parse_ts_to_datetime."""
@patch("dataloader.workers.pipelines.load_tenera.APP_CTX")
def test_parses_valid_timestamp(self, mock_ctx):
"""Тест парсинга валидного timestamp."""
mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow")
result = _parse_ts_to_datetime("1609459200")
assert result is not None
assert isinstance(result, datetime)
assert result.tzinfo is None
@patch("dataloader.workers.pipelines.load_tenera.APP_CTX")
def test_parses_timestamp_with_whitespace(self, mock_ctx):
"""Тест парсинга timestamp с пробелами."""
mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow")
result = _parse_ts_to_datetime(" 1609459200 ")
assert result is not None
assert isinstance(result, datetime)
def test_returns_none_for_empty_string(self):
"""Тест что пустая строка возвращает None."""
assert _parse_ts_to_datetime("") is None
assert _parse_ts_to_datetime(" ") is None
def test_returns_none_for_non_digit_string(self):
"""Тест что не-цифровая строка возвращает None."""
assert _parse_ts_to_datetime("abc123") is None
assert _parse_ts_to_datetime("2025-01-15") is None
@patch("dataloader.workers.pipelines.load_tenera.APP_CTX")
def test_handles_invalid_timestamp(self, mock_ctx):
"""Тест обработки невалидного timestamp."""
mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow")
result = _parse_ts_to_datetime("999999999999999")
assert result is None
@pytest.mark.unit
class TestBuildValueRow:
"""Тесты для функции _build_value_row."""
def test_handles_int_point(self):
"""Тест обработки int значения."""
dt = datetime(2025, 1, 15, 12, 0, 0)
result = _build_value_row("test", dt, 42)
assert result == {"dt": dt, "key": 42}
def test_handles_investing_numeric(self):
"""Тест обработки InvestingNumeric."""
dt = datetime(2025, 1, 15, 12, 0, 0)
inner = InvestingNumeric(
profit="1.5%",
base_value="100.0",
max_value="105.0",
min_value="95.0",
change="5.0",
change_ptc="5%",
)
point = TimePointUnion(root=InvestingTimePoint(root=inner))
result = _build_value_row("investing", dt, point)
assert result is not None
assert result["dt"] == dt
assert result["value_profit"] == 1.5
assert result["value_base"] == 100.0
assert result["value_max"] == 105.0
assert result["value_min"] == 95.0
assert result["value_chng"] == 5.0
assert result["value_chng_prc"] == 5.0
def test_handles_investing_candlestick(self):
"""Тест обработки InvestingCandlestick."""
dt = datetime(2025, 1, 15, 12, 0, 0)
inner = InvestingCandlestick(
open_="100.0", high="105.0", low="95.0", close="102.0", interest=None, value="1000"
)
point = TimePointUnion(root=InvestingTimePoint(root=inner))
result = _build_value_row("investing", dt, point)
assert result is not None
assert result["dt"] == dt
assert result["price_o"] == 100.0
assert result["price_h"] == 105.0
assert result["price_l"] == 95.0
assert result["price_c"] == 102.0
assert result["volume"] == 1000.0
def test_handles_trading_view_timepoint(self):
"""Тест обработки TradingViewTimePoint."""
dt = datetime(2025, 1, 15, 12, 0, 0)
inner = TradingViewTimePoint(
open_="100", high="105", low="95", close="102", volume="5000"
)
point = TimePointUnion(root=inner)
result = _build_value_row("tradingview", dt, point)
assert result is not None
assert result["dt"] == dt
assert result["price_o"] == 100.0
assert result["price_h"] == 105.0
assert result["price_l"] == 95.0
assert result["price_c"] == 102.0
assert result["volume"] == 5000.0
def test_handles_sgx_timepoint(self):
"""Тест обработки SgxTimePoint."""
dt = datetime(2025, 1, 15, 12, 0, 0)
inner = SgxTimePoint(
open_="100", high="105", low="95", close="102", interest="3000", value="2000"
)
point = TimePointUnion(root=inner)
result = _build_value_row("sgx", dt, point)
assert result is not None
assert result["dt"] == dt
assert result["price_o"] == 100.0
assert result["price_h"] == 105.0
assert result["price_l"] == 95.0
assert result["price_c"] == 102.0
assert result["volume"] == 3000.0
def test_handles_bloomberg_timepoint(self):
"""Тест обработки BloombergTimePoint."""
dt = datetime(2025, 1, 15, 12, 0, 0)
inner = BloombergTimePoint(value="123.45")
point = TimePointUnion(root=inner)
result = _build_value_row("bloomberg", dt, point)
assert result is not None
assert result["dt"] == dt
assert result["value_base"] == 123.45
def test_handles_cbr_timepoint(self):
"""Тест обработки CbrTimePoint."""
dt = datetime(2025, 1, 15, 12, 0, 0)
inner = CbrTimePoint(value="80,32")
point = TimePointUnion(root=inner)
result = _build_value_row("cbr", dt, point)
assert result is not None
assert result["dt"] == dt
assert result["value_base"] == 80.32
def test_handles_trading_economics_numeric(self):
"""Тест обработки TradingEconomicsNumeric."""
dt = datetime(2025, 1, 15, 12, 0, 0)
deep_inner = TradingEconomicsNumeric(
price="100",
day="1.5",
percent="2.0",
weekly="3.0",
monthly="4.0",
ytd="5.0",
yoy="6.0",
)
inner = TradingEconomicsTimePoint(root=deep_inner)
point = TimePointUnion(root=inner)
result = _build_value_row("tradingeconomics", dt, point)
assert result is not None
assert result["dt"] == dt
assert result["price_i"] == 100.0
assert result["value_day"] == 1.5
assert result["value_prc"] == 2.0
assert result["value_weekly_prc"] == 3.0
assert result["value_monthly_prc"] == 4.0
assert result["value_ytd_prc"] == 5.0
assert result["value_yoy_prc"] == 6.0
def test_handles_trading_economics_last_prev(self):
"""Тест обработки TradingEconomicsLastPrev."""
dt = datetime(2025, 1, 15, 12, 0, 0)
deep_inner = TradingEconomicsLastPrev(last="100", previous="95", unit="%")
inner = TradingEconomicsTimePoint(root=deep_inner)
point = TimePointUnion(root=inner)
result = _build_value_row("tradingeconomics", dt, point)
assert result is not None
assert result["dt"] == dt
assert result["value_last"] == 100.0
assert result["value_previous"] == 95.0
assert result["unit"] == "%"
def test_handles_trading_economics_string_percent(self):
"""Тест обработки TradingEconomicsStringPercent."""
dt = datetime(2025, 1, 15, 12, 0, 0)
deep_inner = TradingEconomicsStringPercent(root="5.5%")
inner = TradingEconomicsTimePoint(root=deep_inner)
point = TimePointUnion(root=inner)
result = _build_value_row("tradingeconomics", dt, point)
assert result is not None
assert result["dt"] == dt
assert result["value_prc"] == 5.5
def test_handles_trading_economics_string_time(self):
"""Тест обработки TradingEconomicsStringTime."""
dt = datetime(2025, 1, 15, 12, 0, 0)
deep_inner = TradingEconomicsStringTime(root="12:00 PM")
inner = TradingEconomicsTimePoint(root=deep_inner)
point = TimePointUnion(root=inner)
result = _build_value_row("tradingeconomics", dt, point)
assert result is None
def test_handles_trading_economics_empty_string(self):
"""Тест обработки TradingEconomicsEmptyString."""
dt = datetime(2025, 1, 15, 12, 0, 0)
deep_inner = TradingEconomicsEmptyString(root="")
inner = TradingEconomicsTimePoint(root=deep_inner)
point = TimePointUnion(root=inner)
result = _build_value_row("tradingeconomics", dt, point)
assert result is not None
assert result["dt"] == dt
assert result["is_empty_str_flg"] is True
def test_returns_none_for_unknown_type(self):
"""Тест что неизвестный тип возвращает None."""
dt = datetime(2025, 1, 15, 12, 0, 0)
result = _build_value_row("unknown", dt, "string_value")
assert result is None
@pytest.mark.unit
class TestProcessSource:
"""Тесты для функции _process_source."""
@pytest.mark.asyncio
@patch("dataloader.workers.pipelines.load_tenera.APP_CTX")
async def test_processes_source_successfully(self, mock_ctx):
"""Тест успешной обработки источника."""
mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow")
mock_logger = MagicMock()
mock_ctx.logger = mock_logger
mock_repo = AsyncMock()
mock_section = MagicMock()
mock_section.section_id = 1
mock_repo.get_section_by_name = AsyncMock(return_value=mock_section)
mock_quote = MagicMock()
mock_quote.quote_id = 1
mock_repo.upsert_quote = AsyncMock(return_value=mock_quote)
mock_repo.bulk_upsert_quote_values = AsyncMock()
source_data = {
"instrument1": {
"1609459200": TimePointUnion(root=CbrTimePoint(value="80.5")),
},
}
await _process_source(mock_repo, "cbr", source_data)
mock_repo.get_section_by_name.assert_called_once_with("cbr")
mock_repo.upsert_quote.assert_called_once()
mock_repo.bulk_upsert_quote_values.assert_called_once()
@pytest.mark.asyncio
@patch("dataloader.workers.pipelines.load_tenera.APP_CTX")
async def test_skips_source_when_section_not_found(self, mock_ctx):
"""Тест пропуска источника когда секция не найдена."""
mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow")
mock_logger = MagicMock()
mock_ctx.logger = mock_logger
mock_repo = AsyncMock()
mock_repo.get_section_by_name = AsyncMock(return_value=None)
source_data = {"instrument1": {}}
await _process_source(mock_repo, "unknown", source_data)
mock_repo.get_section_by_name.assert_called_once_with("unknown")
mock_repo.upsert_quote.assert_not_called()
mock_logger.warning.assert_called()
@pytest.mark.asyncio
@patch("dataloader.workers.pipelines.load_tenera.APP_CTX")
async def test_skips_instruments_with_no_valid_rows(self, mock_ctx):
"""Тест пропуска инструментов без валидных строк."""
mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow")
mock_logger = MagicMock()
mock_ctx.logger = mock_logger
mock_repo = AsyncMock()
mock_section = MagicMock()
mock_repo.get_section_by_name = AsyncMock(return_value=mock_section)
mock_quote = MagicMock()
mock_repo.upsert_quote = AsyncMock(return_value=mock_quote)
mock_repo.bulk_upsert_quote_values = AsyncMock()
source_data = {
"instrument1": {
"invalid_ts": "invalid_data",
},
}
await _process_source(mock_repo, "cbr", source_data)
mock_repo.upsert_quote.assert_called_once()
mock_repo.bulk_upsert_quote_values.assert_not_called()
@pytest.mark.unit
class TestLoadTeneraPipeline:
"""Тесты для пайплайна load_tenera."""
@pytest.mark.asyncio
@patch("dataloader.workers.pipelines.load_tenera.APP_CTX")
async def test_full_pipeline_success(self, mock_ctx):
"""Тест успешного выполнения полного пайплайна."""
mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow")
mock_logger = MagicMock()
mock_ctx.logger = mock_logger
mock_tenera = AsyncMock()
mock_tenera.__aenter__ = AsyncMock(return_value=mock_tenera)
mock_tenera.__aexit__ = AsyncMock()
mock_data = MagicMock(spec=MainData)
mock_data.cbr = {"USD": {"1609459200": TimePointUnion(root=CbrTimePoint(value="75.0"))}}
mock_data.investing = {}
mock_data.sgx = {}
mock_data.tradingeconomics = {}
mock_data.bloomberg = {}
mock_data.trading_view = {}
mock_tenera.get_quotes_data = AsyncMock(return_value=mock_data)
mock_session = AsyncMock()
mock_sessionmaker = MagicMock()
mock_sessionmaker.return_value.__aenter__ = AsyncMock(return_value=mock_session)
mock_sessionmaker.return_value.__aexit__ = AsyncMock()
mock_ctx.sessionmaker = mock_sessionmaker
mock_repo = AsyncMock()
mock_section = MagicMock()
mock_section.section_id = 1
mock_repo.get_section_by_name = AsyncMock(return_value=mock_section)
mock_quote = MagicMock()
mock_quote.quote_id = 1
mock_repo.upsert_quote = AsyncMock(return_value=mock_quote)
mock_repo.bulk_upsert_quote_values = AsyncMock()
with (
patch(
"dataloader.workers.pipelines.load_tenera.get_async_tenera_interface",
return_value=mock_tenera,
),
patch(
"dataloader.workers.pipelines.load_tenera.QuotesRepository",
return_value=mock_repo,
),
):
steps = []
async for _ in load_tenera({}):
steps.append("step")
assert len(steps) >= 1
mock_tenera.get_quotes_data.assert_called_once()
mock_repo.get_section_by_name.assert_called()
mock_session.commit.assert_called()
@pytest.mark.asyncio
@patch("dataloader.workers.pipelines.load_tenera.APP_CTX")
async def test_pipeline_processes_multiple_sources(self, mock_ctx):
"""Тест обработки нескольких источников."""
mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow")
mock_logger = MagicMock()
mock_ctx.logger = mock_logger
mock_tenera = AsyncMock()
mock_tenera.__aenter__ = AsyncMock(return_value=mock_tenera)
mock_tenera.__aexit__ = AsyncMock()
mock_data = MagicMock(spec=MainData)
mock_data.cbr = {"USD": {}}
mock_data.investing = {"SPX": {}}
mock_data.sgx = {}
mock_data.tradingeconomics = {}
mock_data.bloomberg = {}
mock_data.trading_view = {}
mock_tenera.get_quotes_data = AsyncMock(return_value=mock_data)
mock_session = AsyncMock()
mock_sessionmaker = MagicMock()
mock_sessionmaker.return_value.__aenter__ = AsyncMock(return_value=mock_session)
mock_sessionmaker.return_value.__aexit__ = AsyncMock()
mock_ctx.sessionmaker = mock_sessionmaker
mock_repo = AsyncMock()
mock_section = MagicMock()
mock_repo.get_section_by_name = AsyncMock(return_value=mock_section)
mock_quote = MagicMock()
mock_repo.upsert_quote = AsyncMock(return_value=mock_quote)
mock_repo.bulk_upsert_quote_values = AsyncMock()
with (
patch(
"dataloader.workers.pipelines.load_tenera.get_async_tenera_interface",
return_value=mock_tenera,
),
patch(
"dataloader.workers.pipelines.load_tenera.QuotesRepository",
return_value=mock_repo,
),
):
async for _ in load_tenera({}):
pass
assert mock_repo.get_section_by_name.call_count >= 2