From 0ea7add4c36a39f60fa5bfc11d9a630fa9ab1c43 Mon Sep 17 00:00:00 2001 From: itqop Date: Thu, 6 Nov 2025 13:33:04 +0300 Subject: [PATCH] tests: add new tests --- src/dataloader/storage/repositories/quotes.py | 70 --- .../test_pipeline_load_opu_integration.py | 123 ++++ .../test_pipeline_load_tenera_integration.py | 189 ++++++ tests/unit/test_pipeline_load_opu.py | 253 +++++++++ tests/unit/test_pipeline_load_tenera.py | 536 ++++++++++++++++++ 5 files changed, 1101 insertions(+), 70 deletions(-) create mode 100644 tests/integration_tests/test_pipeline_load_opu_integration.py create mode 100644 tests/integration_tests/test_pipeline_load_tenera_integration.py create mode 100644 tests/unit/test_pipeline_load_opu.py create mode 100644 tests/unit/test_pipeline_load_tenera.py diff --git a/src/dataloader/storage/repositories/quotes.py b/src/dataloader/storage/repositories/quotes.py index c9a8599..e60dee8 100644 --- a/src/dataloader/storage/repositories/quotes.py +++ b/src/dataloader/storage/repositories/quotes.py @@ -39,48 +39,6 @@ class QuotesRepository: result = await self.s.execute(stmt) return result.scalar_one_or_none() - async def get_or_create_section( - self, name: str, params: dict | None = None - ) -> QuoteSection: - """ - Получить существующую секцию или создать новую. - - Args: - name: Имя секции - params: Параметры секции - - Returns: - QuoteSection - """ - existing = await self.get_section_by_name(name) - if existing: - return existing - - async with self.s.begin_nested(): - obj = QuoteSection(name=name, params=params or {}) - self.s.add(obj) - await self.s.flush() - await self.s.refresh(obj) - return obj - - async def get_quote_by_name(self, section_id: int, name: str) -> Quote | None: - """ - Получить котировку по имени в рамках секции. - - Args: - section_id: ID секции - name: Имя котировки - - Returns: - Quote или None - """ - stmt = sa.select(Quote).where( - Quote.quote_sect_id == section_id, - Quote.name == name, - ) - result = await self.s.execute(stmt) - return result.scalar_one_or_none() - async def upsert_quote( self, section: QuoteSection, @@ -176,31 +134,3 @@ class QuotesRepository: async with self.s.begin_nested(): await self.s.execute(stmt) await self.s.flush() - - async def list_quote_values_for_period( - self, - quote_id: int, - dt_from: datetime, - dt_to: datetime, - ) -> list[QuoteValue]: - """ - Получить значения котировки за период. - - Args: - quote_id: ID котировки - dt_from: Дата начала - dt_to: Дата окончания - - Returns: - Список QuoteValue - """ - stmt = ( - sa.select(QuoteValue) - .where( - QuoteValue.quote_id == quote_id, - QuoteValue.dt.between(dt_from, dt_to), - ) - .order_by(QuoteValue.dt) - ) - result = await self.s.execute(stmt) - return list(result.scalars().all()) diff --git a/tests/integration_tests/test_pipeline_load_opu_integration.py b/tests/integration_tests/test_pipeline_load_opu_integration.py new file mode 100644 index 0000000..5393601 --- /dev/null +++ b/tests/integration_tests/test_pipeline_load_opu_integration.py @@ -0,0 +1,123 @@ +""" +Интеграционные тесты для пайплайна load_opu. + +ВНИМАНИЕ: Эти тесты требуют работающего Gmap2Brief API и настоящего соединения с БД. +По умолчанию они исключены из запуска через pytest.mark.skip. +Для запуска используйте: pytest tests/integration_tests/test_pipeline_load_opu_integration.py --no-skip +""" + +from __future__ import annotations + +import pytest + +from dataloader.context import APP_CTX +from dataloader.interfaces.gmap2_brief.interface import get_gmap2brief_interface +from dataloader.storage.repositories.opu import OpuRepository +from dataloader.workers.pipelines.load_opu import load_opu + + +@pytest.mark.integration +@pytest.mark.skip(reason="Requires working Gmap2Brief API - run manually when service is available") +class TestLoadOpuIntegration: + """Интеграционные тесты для пайплайна load_opu.""" + + @pytest.mark.asyncio + async def test_full_opu_pipeline_with_real_api(self, db_session): + """ + Тест полного пайплайна OPU с реальным API. + + Требования: + - Gmap2Brief API должен быть доступен + - База данных должна быть настроена + - Схема OPU должна существовать + """ + interface = get_gmap2brief_interface() + + try: + job_id = await interface.start_export() + assert job_id is not None + assert isinstance(job_id, str) + + status = await interface.wait_for_completion(job_id, max_wait=300) + assert status.status == "completed" + assert status.total_rows > 0 + + except Exception as e: + pytest.skip(f"Gmap2Brief API not available: {e}") + + steps = 0 + async for _ in load_opu({}): + steps += 1 + + assert steps > 0 + + async with APP_CTX.sessionmaker() as session: + repo = OpuRepository(session) + + result = await session.execute( + "SELECT COUNT(*) FROM opu.brief_digital_certificate_opu" + ) + count = result.scalar() + + assert count > 0 + + @pytest.mark.asyncio + async def test_opu_repository_truncate(self, db_session): + """ + Тест операции TRUNCATE репозитория OPU. + + Требование: схема OPU должна существовать в БД. + """ + repo = OpuRepository(db_session) + + await repo.truncate() + await db_session.commit() + + result = await db_session.execute( + "SELECT COUNT(*) FROM opu.brief_digital_certificate_opu" + ) + count = result.scalar() + + assert count == 0 + + @pytest.mark.asyncio + async def test_opu_repository_bulk_insert(self, db_session): + """ + Тест массовой вставки данных в репозиторий OPU. + + Требование: схема OPU должна существовать в БД. + """ + repo = OpuRepository(db_session) + + await repo.truncate() + await db_session.commit() + + test_records = [ + { + "object_id": f"test_{i}", + "desk_nm": "TEST_DESK", + "actdate": "2025-01-15", + "layer_cd": "LAYER1", + "opu_cd": "OPU1", + "opu_lvl": 1, + "opu_prnt_cd": "PARENT", + "object_unit": "UNIT1", + "opu_nm": f"Test OPU {i}", + } + for i in range(10) + ] + + inserted = await repo.bulk_insert(test_records) + await db_session.commit() + + assert inserted == 10 + + result = await db_session.execute( + "SELECT COUNT(*) FROM opu.brief_digital_certificate_opu WHERE desk_nm = 'TEST_DESK'" + ) + count = result.scalar() + + assert count == 10 + + await repo.truncate() + await db_session.commit() diff --git a/tests/integration_tests/test_pipeline_load_tenera_integration.py b/tests/integration_tests/test_pipeline_load_tenera_integration.py new file mode 100644 index 0000000..b57ebeb --- /dev/null +++ b/tests/integration_tests/test_pipeline_load_tenera_integration.py @@ -0,0 +1,189 @@ +""" +Интеграционные тесты для пайплайна load_tenera. + +ВНИМАНИЕ: Эти тесты требуют работающего SuperTenera API и настоящего соединения с БД. +По умолчанию они исключены из запуска через pytest.mark.skip. +Для запуска используйте: pytest tests/integration_tests/test_pipeline_load_tenera_integration.py --no-skip +""" + +from __future__ import annotations + +import pytest + +from dataloader.context import APP_CTX +from dataloader.interfaces.tenera.interface import get_async_tenera_interface +from dataloader.storage.repositories.quotes import QuotesRepository +from dataloader.workers.pipelines.load_tenera import load_tenera + + +@pytest.mark.integration +@pytest.mark.skip(reason="Requires working SuperTenera API - run manually when service is available") +class TestLoadTeneraIntegration: + """Интеграционные тесты для пайплайна load_tenera.""" + + @pytest.mark.asyncio + async def test_full_tenera_pipeline_with_real_api(self, db_session): + """ + Тест полного пайплайна TENERA с реальным API. + + Требования: + - SuperTenera API должен быть доступен + - База данных должна быть настроена + - Схема quotes должна существовать + - Таблицы quote_section, quote, quote_value должны существовать + """ + try: + async with get_async_tenera_interface() as tenera: + data = await tenera.get_quotes_data() + assert data is not None + + except Exception as e: + pytest.skip(f"SuperTenera API not available: {e}") + + steps = 0 + async for _ in load_tenera({}): + steps += 1 + + assert steps > 0 + + async with APP_CTX.sessionmaker() as session: + result = await session.execute("SELECT COUNT(*) FROM quotes.quote_value") + count = result.scalar() + + assert count > 0 + + @pytest.mark.asyncio + async def test_tenera_interface_get_quotes_data(self): + """ + Тест получения данных котировок из SuperTenera API. + + Требование: SuperTenera API должен быть доступен. + """ + try: + async with get_async_tenera_interface() as tenera: + data = await tenera.get_quotes_data() + + assert data is not None + assert hasattr(data, "cbr") + assert hasattr(data, "investing") + assert hasattr(data, "sgx") + assert hasattr(data, "tradingeconomics") + assert hasattr(data, "bloomberg") + assert hasattr(data, "trading_view") + + except Exception as e: + pytest.skip(f"SuperTenera API not available: {e}") + + @pytest.mark.asyncio + async def test_quotes_repository_get_section_by_name(self, db_session): + """ + Тест получения секции по имени. + + Требование: схема quotes и таблица quote_section должны существовать в БД. + """ + repo = QuotesRepository(db_session) + + section = await repo.get_section_by_name("cbr") + + if section is not None: + assert section.section_nm == "cbr" + assert section.section_id is not None + else: + pytest.skip("Section 'cbr' not found in database - seed data required") + + @pytest.mark.asyncio + async def test_quotes_repository_upsert_quote(self, db_session): + """ + Тест upsert котировки. + + Требование: схема quotes должна существовать в БД. + """ + repo = QuotesRepository(db_session) + + section = await repo.get_section_by_name("cbr") + if section is None: + pytest.skip("Section 'cbr' not found - cannot test upsert") + + from datetime import datetime + + quote = await repo.upsert_quote( + section=section, + name="TEST_USD", + last_update_dttm=datetime.now(), + ) + + assert quote is not None + assert quote.quote_nm == "TEST_USD" + assert quote.section_id == section.section_id + + quote2 = await repo.upsert_quote( + section=section, + name="TEST_USD", + last_update_dttm=datetime.now(), + ) + + assert quote2.quote_id == quote.quote_id + + @pytest.mark.asyncio + async def test_quotes_repository_bulk_upsert_quote_values(self, db_session): + """ + Тест массового upsert значений котировок. + + Требование: схема quotes должна существовать в БД. + """ + repo = QuotesRepository(db_session) + + section = await repo.get_section_by_name("cbr") + if section is None: + pytest.skip("Section 'cbr' not found - cannot test bulk upsert") + + from datetime import datetime + + quote = await repo.upsert_quote( + section=section, + name="TEST_BULK_USD", + last_update_dttm=datetime.now(), + ) + + test_rows = [ + { + "dt": datetime(2025, 1, 15, i, 0, 0), + "value_base": 75.0 + i, + } + for i in range(5) + ] + + await repo.bulk_upsert_quote_values(quote, test_rows) + await db_session.commit() + + result = await db_session.execute( + "SELECT COUNT(*) FROM quotes.quote_value WHERE quote_id = :quote_id", + {"quote_id": quote.quote_id}, + ) + count = result.scalar() + + assert count == 5 + + @pytest.mark.asyncio + async def test_tenera_pipeline_processes_all_sources(self, db_session): + """ + Тест что пайплайн обрабатывает все источники. + + Требования: + - SuperTenera API должен быть доступен + - Все секции должны существовать в БД + """ + try: + async with get_async_tenera_interface() as tenera: + data = await tenera.get_quotes_data() + + sources_with_data = [] + for source_name in ["cbr", "investing", "sgx", "tradingeconomics", "bloomberg", "trading_view"]: + source_data = getattr(data, source_name, None) + if source_data: + sources_with_data.append(source_name) + + assert len(sources_with_data) > 0 + + except Exception as e: + pytest.skip(f"SuperTenera API not available: {e}") diff --git a/tests/unit/test_pipeline_load_opu.py b/tests/unit/test_pipeline_load_opu.py new file mode 100644 index 0000000..b934321 --- /dev/null +++ b/tests/unit/test_pipeline_load_opu.py @@ -0,0 +1,253 @@ +"""Unit тесты для пайплайна load_opu.""" + +from __future__ import annotations + +import tempfile +from datetime import date, datetime +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, Mock, patch + +import orjson +import pytest +import zstandard as zstd + +from dataloader.interfaces.gmap2_brief.schemas import ExportJobStatus +from dataloader.workers.pipelines.load_opu import ( + _convert_record, + _parse_jsonl_from_zst, + load_opu, +) + + +@pytest.mark.unit +class TestParseJsonlFromZst: + """Тесты для функции _parse_jsonl_from_zst.""" + + def test_parses_valid_zst_file_with_small_batch(self): + """Тест парсинга валидного zst файла с небольшим батчем.""" + records = [ + {"id": 1, "name": "test1"}, + {"id": 2, "name": "test2"}, + {"id": 3, "name": "test3"}, + ] + + lines = [orjson.dumps(r) for r in records] + content = b"\n".join(lines) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = Path(tmpdir) / "test.jsonl.zst" + + cctx = zstd.ZstdCompressor() + compressed = cctx.compress(content) + + with open(file_path, "wb") as f: + f.write(compressed) + + batches = list(_parse_jsonl_from_zst(file_path, chunk_size=2)) + + assert len(batches) == 2 + assert batches[0] == [{"id": 1, "name": "test1"}, {"id": 2, "name": "test2"}] + assert batches[1] == [{"id": 3, "name": "test3"}] + + def test_parses_empty_file(self): + """Тест парсинга пустого файла.""" + with tempfile.TemporaryDirectory() as tmpdir: + file_path = Path(tmpdir) / "empty.jsonl.zst" + + cctx = zstd.ZstdCompressor() + compressed = cctx.compress(b"") + + with open(file_path, "wb") as f: + f.write(compressed) + + batches = list(_parse_jsonl_from_zst(file_path, chunk_size=100)) + + assert len(batches) == 0 + + def test_skips_empty_lines(self): + """Тест пропуска пустых строк.""" + records = [ + {"id": 1}, + {"id": 2}, + ] + + lines = [orjson.dumps(records[0]), b"", b" ", orjson.dumps(records[1])] + content = b"\n".join(lines) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = Path(tmpdir) / "test.jsonl.zst" + + cctx = zstd.ZstdCompressor() + compressed = cctx.compress(content) + + with open(file_path, "wb") as f: + f.write(compressed) + + batches = list(_parse_jsonl_from_zst(file_path, chunk_size=10)) + + assert len(batches) == 1 + assert batches[0] == [{"id": 1}, {"id": 2}] + + @patch("dataloader.workers.pipelines.load_opu.APP_CTX") + def test_handles_invalid_json_gracefully(self, mock_ctx): + """Тест обработки невалидного JSON.""" + mock_logger = MagicMock() + mock_ctx.logger = mock_logger + + lines = [ + orjson.dumps({"id": 1}), + b"{invalid json}", + orjson.dumps({"id": 2}), + ] + content = b"\n".join(lines) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = Path(tmpdir) / "test.jsonl.zst" + + cctx = zstd.ZstdCompressor() + compressed = cctx.compress(content) + + with open(file_path, "wb") as f: + f.write(compressed) + + batches = list(_parse_jsonl_from_zst(file_path, chunk_size=10)) + + assert len(batches) == 1 + assert batches[0] == [{"id": 1}, {"id": 2}] + mock_logger.warning.assert_called() + + +@pytest.mark.unit +class TestConvertRecord: + """Тесты для функции _convert_record.""" + + def test_converts_actdate_string_to_date(self): + """Тест конвертации actdate из строки в date.""" + raw = { + "id": 1, + "actdate": "2025-01-15", + "name": "test", + } + + result = _convert_record(raw) + + assert result["id"] == 1 + assert result["actdate"] == date(2025, 1, 15) + assert result["name"] == "test" + + def test_converts_wf_load_dttm_string_to_datetime(self): + """Тест конвертации wf_load_dttm из строки в datetime.""" + raw = { + "id": 1, + "wf_load_dttm": "2025-01-15T12:30:45", + } + + result = _convert_record(raw) + + assert result["id"] == 1 + assert result["wf_load_dttm"] == datetime(2025, 1, 15, 12, 30, 45) + + def test_keeps_non_date_fields_unchanged(self): + """Тест сохранения полей без конвертации.""" + raw = { + "id": 1, + "name": "test", + "value": 123.45, + } + + result = _convert_record(raw) + + assert result == raw + + def test_handles_already_converted_dates(self): + """Тест обработки уже сконвертированных дат.""" + actdate_obj = date(2025, 1, 15) + wf_load_dttm_obj = datetime(2025, 1, 15, 12, 30, 45) + + raw = { + "id": 1, + "actdate": actdate_obj, + "wf_load_dttm": wf_load_dttm_obj, + } + + result = _convert_record(raw) + + assert result["actdate"] == actdate_obj + assert result["wf_load_dttm"] == wf_load_dttm_obj + + +@pytest.mark.unit +class TestLoadOpuPipeline: + """Тесты для пайплайна load_opu.""" + + @pytest.mark.asyncio + async def test_full_pipeline_success(self): + """Тест успешного выполнения полного пайплайна.""" + mock_interface = AsyncMock() + mock_interface.start_export = AsyncMock(return_value="job123") + mock_interface.wait_for_completion = AsyncMock( + return_value=ExportJobStatus( + job_id="job123", + status="completed", + total_rows=100, + ) + ) + + with tempfile.TemporaryDirectory() as tmpdir: + archive_path = Path(tmpdir) / "test.jsonl.zst" + + records = [ + {"id": i, "actdate": "2025-01-15", "name": f"test{i}"} for i in range(10) + ] + lines = [orjson.dumps(r) for r in records] + content = b"\n".join(lines) + + cctx = zstd.ZstdCompressor() + compressed = cctx.compress(content) + with open(archive_path, "wb") as f: + f.write(compressed) + + async def mock_download(job_id: str, output_path: Path): + with open(archive_path, "rb") as src: + with open(output_path, "wb") as dst: + dst.write(src.read()) + + mock_interface.download_export = AsyncMock(side_effect=mock_download) + + mock_session = AsyncMock() + mock_sessionmaker = MagicMock() + mock_sessionmaker.return_value.__aenter__ = AsyncMock( + return_value=mock_session + ) + mock_sessionmaker.return_value.__aexit__ = AsyncMock() + + mock_repo = AsyncMock() + mock_repo.truncate = AsyncMock() + mock_repo.bulk_insert = AsyncMock(return_value=10) + + mock_app_ctx = MagicMock() + mock_app_ctx.logger = MagicMock() + mock_app_ctx.sessionmaker = mock_sessionmaker + + with ( + patch( + "dataloader.workers.pipelines.load_opu.get_gmap2brief_interface", + return_value=mock_interface, + ), + patch( + "dataloader.workers.pipelines.load_opu.OpuRepository", + return_value=mock_repo, + ), + patch("dataloader.workers.pipelines.load_opu.APP_CTX", mock_app_ctx), + ): + steps = [] + async for _ in load_opu({}): + steps.append("step") + + assert len(steps) >= 4 + + mock_interface.start_export.assert_called_once() + mock_interface.wait_for_completion.assert_called_once_with("job123") + mock_interface.download_export.assert_called_once() + mock_repo.truncate.assert_called_once() + mock_repo.bulk_insert.assert_called() diff --git a/tests/unit/test_pipeline_load_tenera.py b/tests/unit/test_pipeline_load_tenera.py new file mode 100644 index 0000000..1364fad --- /dev/null +++ b/tests/unit/test_pipeline_load_tenera.py @@ -0,0 +1,536 @@ +"""Unit тесты для пайплайна load_tenera.""" + +from __future__ import annotations + +from datetime import datetime +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import pytz + +from dataloader.interfaces.tenera.schemas import ( + BloombergTimePoint, + CbrTimePoint, + InvestingCandlestick, + InvestingNumeric, + InvestingTimePoint, + MainData, + SgxTimePoint, + TimePointUnion, + TradingEconomicsEmptyString, + TradingEconomicsLastPrev, + TradingEconomicsNumeric, + TradingEconomicsStringPercent, + TradingEconomicsStringTime, + TradingEconomicsTimePoint, + TradingViewTimePoint, +) +from dataloader.workers.pipelines.load_tenera import ( + _build_value_row, + _parse_ts_to_datetime, + _process_source, + _to_float, + load_tenera, +) + + +@pytest.mark.unit +class TestToFloat: + """Тесты для функции _to_float.""" + + def test_converts_int_to_float(self): + """Тест конвертации int в float.""" + assert _to_float(42) == 42.0 + + def test_converts_float_to_float(self): + """Тест что float остается float.""" + assert _to_float(3.14) == 3.14 + + def test_converts_string_number_to_float(self): + """Тест конвертации строки с числом.""" + assert _to_float("123.45") == 123.45 + + def test_converts_string_with_comma_to_float(self): + """Тест конвертации строки с запятой.""" + assert _to_float("123,45") == 123.45 + + def test_converts_string_with_percent_to_float(self): + """Тест конвертации строки с процентом.""" + assert _to_float("12.5%") == 12.5 + + def test_converts_string_with_spaces_to_float(self): + """Тест конвертации строки с пробелами.""" + assert _to_float(" 123.45 ") == 123.45 + + def test_returns_none_for_none(self): + """Тест что None возвращает None.""" + assert _to_float(None) is None + + def test_returns_none_for_empty_string(self): + """Тест что пустая строка возвращает None.""" + assert _to_float("") is None + assert _to_float(" ") is None + + def test_returns_none_for_invalid_string(self): + """Тест что невалидная строка возвращает None.""" + assert _to_float("invalid") is None + assert _to_float("abc123") is None + + +@pytest.mark.unit +class TestParseTsToDatetime: + """Тесты для функции _parse_ts_to_datetime.""" + + @patch("dataloader.workers.pipelines.load_tenera.APP_CTX") + def test_parses_valid_timestamp(self, mock_ctx): + """Тест парсинга валидного timestamp.""" + mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow") + + result = _parse_ts_to_datetime("1609459200") + + assert result is not None + assert isinstance(result, datetime) + assert result.tzinfo is None + + @patch("dataloader.workers.pipelines.load_tenera.APP_CTX") + def test_parses_timestamp_with_whitespace(self, mock_ctx): + """Тест парсинга timestamp с пробелами.""" + mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow") + + result = _parse_ts_to_datetime(" 1609459200 ") + + assert result is not None + assert isinstance(result, datetime) + + def test_returns_none_for_empty_string(self): + """Тест что пустая строка возвращает None.""" + assert _parse_ts_to_datetime("") is None + assert _parse_ts_to_datetime(" ") is None + + def test_returns_none_for_non_digit_string(self): + """Тест что не-цифровая строка возвращает None.""" + assert _parse_ts_to_datetime("abc123") is None + assert _parse_ts_to_datetime("2025-01-15") is None + + @patch("dataloader.workers.pipelines.load_tenera.APP_CTX") + def test_handles_invalid_timestamp(self, mock_ctx): + """Тест обработки невалидного timestamp.""" + mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow") + + result = _parse_ts_to_datetime("999999999999999") + + assert result is None + + +@pytest.mark.unit +class TestBuildValueRow: + """Тесты для функции _build_value_row.""" + + def test_handles_int_point(self): + """Тест обработки int значения.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + result = _build_value_row("test", dt, 42) + + assert result == {"dt": dt, "key": 42} + + def test_handles_investing_numeric(self): + """Тест обработки InvestingNumeric.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + inner = InvestingNumeric( + profit="1.5%", + base_value="100.0", + max_value="105.0", + min_value="95.0", + change="5.0", + change_ptc="5%", + ) + point = TimePointUnion(root=InvestingTimePoint(root=inner)) + + result = _build_value_row("investing", dt, point) + + assert result is not None + assert result["dt"] == dt + assert result["value_profit"] == 1.5 + assert result["value_base"] == 100.0 + assert result["value_max"] == 105.0 + assert result["value_min"] == 95.0 + assert result["value_chng"] == 5.0 + assert result["value_chng_prc"] == 5.0 + + def test_handles_investing_candlestick(self): + """Тест обработки InvestingCandlestick.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + inner = InvestingCandlestick( + open_="100.0", high="105.0", low="95.0", close="102.0", interest=None, value="1000" + ) + point = TimePointUnion(root=InvestingTimePoint(root=inner)) + + result = _build_value_row("investing", dt, point) + + assert result is not None + assert result["dt"] == dt + assert result["price_o"] == 100.0 + assert result["price_h"] == 105.0 + assert result["price_l"] == 95.0 + assert result["price_c"] == 102.0 + assert result["volume"] == 1000.0 + + def test_handles_trading_view_timepoint(self): + """Тест обработки TradingViewTimePoint.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + inner = TradingViewTimePoint( + open_="100", high="105", low="95", close="102", volume="5000" + ) + point = TimePointUnion(root=inner) + + result = _build_value_row("tradingview", dt, point) + + assert result is not None + assert result["dt"] == dt + assert result["price_o"] == 100.0 + assert result["price_h"] == 105.0 + assert result["price_l"] == 95.0 + assert result["price_c"] == 102.0 + assert result["volume"] == 5000.0 + + def test_handles_sgx_timepoint(self): + """Тест обработки SgxTimePoint.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + inner = SgxTimePoint( + open_="100", high="105", low="95", close="102", interest="3000", value="2000" + ) + point = TimePointUnion(root=inner) + + result = _build_value_row("sgx", dt, point) + + assert result is not None + assert result["dt"] == dt + assert result["price_o"] == 100.0 + assert result["price_h"] == 105.0 + assert result["price_l"] == 95.0 + assert result["price_c"] == 102.0 + assert result["volume"] == 3000.0 + + def test_handles_bloomberg_timepoint(self): + """Тест обработки BloombergTimePoint.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + inner = BloombergTimePoint(value="123.45") + point = TimePointUnion(root=inner) + + result = _build_value_row("bloomberg", dt, point) + + assert result is not None + assert result["dt"] == dt + assert result["value_base"] == 123.45 + + def test_handles_cbr_timepoint(self): + """Тест обработки CbrTimePoint.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + inner = CbrTimePoint(value="80,32") + point = TimePointUnion(root=inner) + + result = _build_value_row("cbr", dt, point) + + assert result is not None + assert result["dt"] == dt + assert result["value_base"] == 80.32 + + def test_handles_trading_economics_numeric(self): + """Тест обработки TradingEconomicsNumeric.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + deep_inner = TradingEconomicsNumeric( + price="100", + day="1.5", + percent="2.0", + weekly="3.0", + monthly="4.0", + ytd="5.0", + yoy="6.0", + ) + inner = TradingEconomicsTimePoint(root=deep_inner) + point = TimePointUnion(root=inner) + + result = _build_value_row("tradingeconomics", dt, point) + + assert result is not None + assert result["dt"] == dt + assert result["price_i"] == 100.0 + assert result["value_day"] == 1.5 + assert result["value_prc"] == 2.0 + assert result["value_weekly_prc"] == 3.0 + assert result["value_monthly_prc"] == 4.0 + assert result["value_ytd_prc"] == 5.0 + assert result["value_yoy_prc"] == 6.0 + + def test_handles_trading_economics_last_prev(self): + """Тест обработки TradingEconomicsLastPrev.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + deep_inner = TradingEconomicsLastPrev(last="100", previous="95", unit="%") + inner = TradingEconomicsTimePoint(root=deep_inner) + point = TimePointUnion(root=inner) + + result = _build_value_row("tradingeconomics", dt, point) + + assert result is not None + assert result["dt"] == dt + assert result["value_last"] == 100.0 + assert result["value_previous"] == 95.0 + assert result["unit"] == "%" + + def test_handles_trading_economics_string_percent(self): + """Тест обработки TradingEconomicsStringPercent.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + deep_inner = TradingEconomicsStringPercent(root="5.5%") + inner = TradingEconomicsTimePoint(root=deep_inner) + point = TimePointUnion(root=inner) + + result = _build_value_row("tradingeconomics", dt, point) + + assert result is not None + assert result["dt"] == dt + assert result["value_prc"] == 5.5 + + def test_handles_trading_economics_string_time(self): + """Тест обработки TradingEconomicsStringTime.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + deep_inner = TradingEconomicsStringTime(root="12:00 PM") + inner = TradingEconomicsTimePoint(root=deep_inner) + point = TimePointUnion(root=inner) + + result = _build_value_row("tradingeconomics", dt, point) + + assert result is None + + def test_handles_trading_economics_empty_string(self): + """Тест обработки TradingEconomicsEmptyString.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + deep_inner = TradingEconomicsEmptyString(root="") + inner = TradingEconomicsTimePoint(root=deep_inner) + point = TimePointUnion(root=inner) + + result = _build_value_row("tradingeconomics", dt, point) + + assert result is not None + assert result["dt"] == dt + assert result["is_empty_str_flg"] is True + + def test_returns_none_for_unknown_type(self): + """Тест что неизвестный тип возвращает None.""" + dt = datetime(2025, 1, 15, 12, 0, 0) + + result = _build_value_row("unknown", dt, "string_value") + + assert result is None + + +@pytest.mark.unit +class TestProcessSource: + """Тесты для функции _process_source.""" + + @pytest.mark.asyncio + @patch("dataloader.workers.pipelines.load_tenera.APP_CTX") + async def test_processes_source_successfully(self, mock_ctx): + """Тест успешной обработки источника.""" + mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow") + mock_logger = MagicMock() + mock_ctx.logger = mock_logger + + mock_repo = AsyncMock() + + mock_section = MagicMock() + mock_section.section_id = 1 + mock_repo.get_section_by_name = AsyncMock(return_value=mock_section) + + mock_quote = MagicMock() + mock_quote.quote_id = 1 + mock_repo.upsert_quote = AsyncMock(return_value=mock_quote) + + mock_repo.bulk_upsert_quote_values = AsyncMock() + + source_data = { + "instrument1": { + "1609459200": TimePointUnion(root=CbrTimePoint(value="80.5")), + }, + } + + await _process_source(mock_repo, "cbr", source_data) + + mock_repo.get_section_by_name.assert_called_once_with("cbr") + mock_repo.upsert_quote.assert_called_once() + mock_repo.bulk_upsert_quote_values.assert_called_once() + + @pytest.mark.asyncio + @patch("dataloader.workers.pipelines.load_tenera.APP_CTX") + async def test_skips_source_when_section_not_found(self, mock_ctx): + """Тест пропуска источника когда секция не найдена.""" + mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow") + mock_logger = MagicMock() + mock_ctx.logger = mock_logger + + mock_repo = AsyncMock() + mock_repo.get_section_by_name = AsyncMock(return_value=None) + + source_data = {"instrument1": {}} + + await _process_source(mock_repo, "unknown", source_data) + + mock_repo.get_section_by_name.assert_called_once_with("unknown") + mock_repo.upsert_quote.assert_not_called() + mock_logger.warning.assert_called() + + @pytest.mark.asyncio + @patch("dataloader.workers.pipelines.load_tenera.APP_CTX") + async def test_skips_instruments_with_no_valid_rows(self, mock_ctx): + """Тест пропуска инструментов без валидных строк.""" + mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow") + mock_logger = MagicMock() + mock_ctx.logger = mock_logger + + mock_repo = AsyncMock() + + mock_section = MagicMock() + mock_repo.get_section_by_name = AsyncMock(return_value=mock_section) + + mock_quote = MagicMock() + mock_repo.upsert_quote = AsyncMock(return_value=mock_quote) + + mock_repo.bulk_upsert_quote_values = AsyncMock() + + source_data = { + "instrument1": { + "invalid_ts": "invalid_data", + }, + } + + await _process_source(mock_repo, "cbr", source_data) + + mock_repo.upsert_quote.assert_called_once() + mock_repo.bulk_upsert_quote_values.assert_not_called() + + +@pytest.mark.unit +class TestLoadTeneraPipeline: + """Тесты для пайплайна load_tenera.""" + + @pytest.mark.asyncio + @patch("dataloader.workers.pipelines.load_tenera.APP_CTX") + async def test_full_pipeline_success(self, mock_ctx): + """Тест успешного выполнения полного пайплайна.""" + mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow") + mock_logger = MagicMock() + mock_ctx.logger = mock_logger + + mock_tenera = AsyncMock() + mock_tenera.__aenter__ = AsyncMock(return_value=mock_tenera) + mock_tenera.__aexit__ = AsyncMock() + + mock_data = MagicMock(spec=MainData) + mock_data.cbr = {"USD": {"1609459200": TimePointUnion(root=CbrTimePoint(value="75.0"))}} + mock_data.investing = {} + mock_data.sgx = {} + mock_data.tradingeconomics = {} + mock_data.bloomberg = {} + mock_data.trading_view = {} + + mock_tenera.get_quotes_data = AsyncMock(return_value=mock_data) + + mock_session = AsyncMock() + mock_sessionmaker = MagicMock() + mock_sessionmaker.return_value.__aenter__ = AsyncMock(return_value=mock_session) + mock_sessionmaker.return_value.__aexit__ = AsyncMock() + + mock_ctx.sessionmaker = mock_sessionmaker + + mock_repo = AsyncMock() + mock_section = MagicMock() + mock_section.section_id = 1 + mock_repo.get_section_by_name = AsyncMock(return_value=mock_section) + + mock_quote = MagicMock() + mock_quote.quote_id = 1 + mock_repo.upsert_quote = AsyncMock(return_value=mock_quote) + + mock_repo.bulk_upsert_quote_values = AsyncMock() + + with ( + patch( + "dataloader.workers.pipelines.load_tenera.get_async_tenera_interface", + return_value=mock_tenera, + ), + patch( + "dataloader.workers.pipelines.load_tenera.QuotesRepository", + return_value=mock_repo, + ), + ): + steps = [] + async for _ in load_tenera({}): + steps.append("step") + + assert len(steps) >= 1 + + mock_tenera.get_quotes_data.assert_called_once() + mock_repo.get_section_by_name.assert_called() + mock_session.commit.assert_called() + + @pytest.mark.asyncio + @patch("dataloader.workers.pipelines.load_tenera.APP_CTX") + async def test_pipeline_processes_multiple_sources(self, mock_ctx): + """Тест обработки нескольких источников.""" + mock_ctx.pytz_timezone = pytz.timezone("Europe/Moscow") + mock_logger = MagicMock() + mock_ctx.logger = mock_logger + + mock_tenera = AsyncMock() + mock_tenera.__aenter__ = AsyncMock(return_value=mock_tenera) + mock_tenera.__aexit__ = AsyncMock() + + mock_data = MagicMock(spec=MainData) + mock_data.cbr = {"USD": {}} + mock_data.investing = {"SPX": {}} + mock_data.sgx = {} + mock_data.tradingeconomics = {} + mock_data.bloomberg = {} + mock_data.trading_view = {} + + mock_tenera.get_quotes_data = AsyncMock(return_value=mock_data) + + mock_session = AsyncMock() + mock_sessionmaker = MagicMock() + mock_sessionmaker.return_value.__aenter__ = AsyncMock(return_value=mock_session) + mock_sessionmaker.return_value.__aexit__ = AsyncMock() + + mock_ctx.sessionmaker = mock_sessionmaker + + mock_repo = AsyncMock() + mock_section = MagicMock() + mock_repo.get_section_by_name = AsyncMock(return_value=mock_section) + mock_quote = MagicMock() + mock_repo.upsert_quote = AsyncMock(return_value=mock_quote) + mock_repo.bulk_upsert_quote_values = AsyncMock() + + with ( + patch( + "dataloader.workers.pipelines.load_tenera.get_async_tenera_interface", + return_value=mock_tenera, + ), + patch( + "dataloader.workers.pipelines.load_tenera.QuotesRepository", + return_value=mock_repo, + ), + ): + async for _ in load_tenera({}): + pass + + assert mock_repo.get_section_by_name.call_count >= 2