dataloader/tests/integration_tests/test_pipeline_load_tenera_i...

190 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Интеграционные тесты для пайплайна load_tenera.
ВНИМАНИЕ: Эти тесты требуют работающего SuperTenera API и настоящего соединения с БД.
По умолчанию они исключены из запуска через pytest.mark.skip.
Для запуска используйте: pytest tests/integration_tests/test_pipeline_load_tenera_integration.py --no-skip
"""
from __future__ import annotations
import pytest
from dataloader.context import APP_CTX
from dataloader.interfaces.tenera.interface import get_async_tenera_interface
from dataloader.storage.repositories.quotes import QuotesRepository
from dataloader.workers.pipelines.load_tenera import load_tenera
@pytest.mark.integration
@pytest.mark.skip(reason="Requires working SuperTenera API - run manually when service is available")
class TestLoadTeneraIntegration:
"""Интеграционные тесты для пайплайна load_tenera."""
@pytest.mark.asyncio
async def test_full_tenera_pipeline_with_real_api(self, db_session):
"""
Тест полного пайплайна TENERA с реальным API.
Требования:
- SuperTenera API должен быть доступен
- База данных должна быть настроена
- Схема quotes должна существовать
- Таблицы quote_section, quote, quote_value должны существовать
"""
try:
async with get_async_tenera_interface() as tenera:
data = await tenera.get_quotes_data()
assert data is not None
except Exception as e:
pytest.skip(f"SuperTenera API not available: {e}")
steps = 0
async for _ in load_tenera({}):
steps += 1
assert steps > 0
async with APP_CTX.sessionmaker() as session:
result = await session.execute("SELECT COUNT(*) FROM quotes.quote_value")
count = result.scalar()
assert count > 0
@pytest.mark.asyncio
async def test_tenera_interface_get_quotes_data(self):
"""
Тест получения данных котировок из SuperTenera API.
Требование: SuperTenera API должен быть доступен.
"""
try:
async with get_async_tenera_interface() as tenera:
data = await tenera.get_quotes_data()
assert data is not None
assert hasattr(data, "cbr")
assert hasattr(data, "investing")
assert hasattr(data, "sgx")
assert hasattr(data, "tradingeconomics")
assert hasattr(data, "bloomberg")
assert hasattr(data, "trading_view")
except Exception as e:
pytest.skip(f"SuperTenera API not available: {e}")
@pytest.mark.asyncio
async def test_quotes_repository_get_section_by_name(self, db_session):
"""
Тест получения секции по имени.
Требование: схема quotes и таблица quote_section должны существовать в БД.
"""
repo = QuotesRepository(db_session)
section = await repo.get_section_by_name("cbr")
if section is not None:
assert section.section_nm == "cbr"
assert section.section_id is not None
else:
pytest.skip("Section 'cbr' not found in database - seed data required")
@pytest.mark.asyncio
async def test_quotes_repository_upsert_quote(self, db_session):
"""
Тест upsert котировки.
Требование: схема quotes должна существовать в БД.
"""
repo = QuotesRepository(db_session)
section = await repo.get_section_by_name("cbr")
if section is None:
pytest.skip("Section 'cbr' not found - cannot test upsert")
from datetime import datetime
quote = await repo.upsert_quote(
section=section,
name="TEST_USD",
last_update_dttm=datetime.now(),
)
assert quote is not None
assert quote.quote_nm == "TEST_USD"
assert quote.section_id == section.section_id
quote2 = await repo.upsert_quote(
section=section,
name="TEST_USD",
last_update_dttm=datetime.now(),
)
assert quote2.quote_id == quote.quote_id
@pytest.mark.asyncio
async def test_quotes_repository_bulk_upsert_quote_values(self, db_session):
"""
Тест массового upsert значений котировок.
Требование: схема quotes должна существовать в БД.
"""
repo = QuotesRepository(db_session)
section = await repo.get_section_by_name("cbr")
if section is None:
pytest.skip("Section 'cbr' not found - cannot test bulk upsert")
from datetime import datetime
quote = await repo.upsert_quote(
section=section,
name="TEST_BULK_USD",
last_update_dttm=datetime.now(),
)
test_rows = [
{
"dt": datetime(2025, 1, 15, i, 0, 0),
"value_base": 75.0 + i,
}
for i in range(5)
]
await repo.bulk_upsert_quote_values(quote, test_rows)
await db_session.commit()
result = await db_session.execute(
"SELECT COUNT(*) FROM quotes.quote_value WHERE quote_id = :quote_id",
{"quote_id": quote.quote_id},
)
count = result.scalar()
assert count == 5
@pytest.mark.asyncio
async def test_tenera_pipeline_processes_all_sources(self, db_session):
"""
Тест что пайплайн обрабатывает все источники.
Требования:
- SuperTenera API должен быть доступен
- Все секции должны существовать в БД
"""
try:
async with get_async_tenera_interface() as tenera:
data = await tenera.get_quotes_data()
sources_with_data = []
for source_name in ["cbr", "investing", "sgx", "tradingeconomics", "bloomberg", "trading_view"]:
source_data = getattr(data, source_name, None)
if source_data:
sources_with_data.append(source_name)
assert len(sources_with_data) > 0
except Exception as e:
pytest.skip(f"SuperTenera API not available: {e}")