190 lines
6.7 KiB
Python
190 lines
6.7 KiB
Python
"""
|
||
Интеграционные тесты для пайплайна load_tenera.
|
||
|
||
ВНИМАНИЕ: Эти тесты требуют работающего SuperTenera API и настоящего соединения с БД.
|
||
По умолчанию они исключены из запуска через pytest.mark.skip.
|
||
Для запуска используйте: pytest tests/integration_tests/test_pipeline_load_tenera_integration.py --no-skip
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import pytest
|
||
|
||
from dataloader.context import APP_CTX
|
||
from dataloader.interfaces.tenera.interface import get_async_tenera_interface
|
||
from dataloader.storage.repositories.quotes import QuotesRepository
|
||
from dataloader.workers.pipelines.load_tenera import load_tenera
|
||
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.skip(reason="Requires working SuperTenera API - run manually when service is available")
|
||
class TestLoadTeneraIntegration:
|
||
"""Интеграционные тесты для пайплайна load_tenera."""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_full_tenera_pipeline_with_real_api(self, db_session):
|
||
"""
|
||
Тест полного пайплайна TENERA с реальным API.
|
||
|
||
Требования:
|
||
- SuperTenera API должен быть доступен
|
||
- База данных должна быть настроена
|
||
- Схема quotes должна существовать
|
||
- Таблицы quote_section, quote, quote_value должны существовать
|
||
"""
|
||
try:
|
||
async with get_async_tenera_interface() as tenera:
|
||
data = await tenera.get_quotes_data()
|
||
assert data is not None
|
||
|
||
except Exception as e:
|
||
pytest.skip(f"SuperTenera API not available: {e}")
|
||
|
||
steps = 0
|
||
async for _ in load_tenera({}):
|
||
steps += 1
|
||
|
||
assert steps > 0
|
||
|
||
async with APP_CTX.sessionmaker() as session:
|
||
result = await session.execute("SELECT COUNT(*) FROM quotes.quote_value")
|
||
count = result.scalar()
|
||
|
||
assert count > 0
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_tenera_interface_get_quotes_data(self):
|
||
"""
|
||
Тест получения данных котировок из SuperTenera API.
|
||
|
||
Требование: SuperTenera API должен быть доступен.
|
||
"""
|
||
try:
|
||
async with get_async_tenera_interface() as tenera:
|
||
data = await tenera.get_quotes_data()
|
||
|
||
assert data is not None
|
||
assert hasattr(data, "cbr")
|
||
assert hasattr(data, "investing")
|
||
assert hasattr(data, "sgx")
|
||
assert hasattr(data, "tradingeconomics")
|
||
assert hasattr(data, "bloomberg")
|
||
assert hasattr(data, "trading_view")
|
||
|
||
except Exception as e:
|
||
pytest.skip(f"SuperTenera API not available: {e}")
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_quotes_repository_get_section_by_name(self, db_session):
|
||
"""
|
||
Тест получения секции по имени.
|
||
|
||
Требование: схема quotes и таблица quote_section должны существовать в БД.
|
||
"""
|
||
repo = QuotesRepository(db_session)
|
||
|
||
section = await repo.get_section_by_name("cbr")
|
||
|
||
if section is not None:
|
||
assert section.section_nm == "cbr"
|
||
assert section.section_id is not None
|
||
else:
|
||
pytest.skip("Section 'cbr' not found in database - seed data required")
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_quotes_repository_upsert_quote(self, db_session):
|
||
"""
|
||
Тест upsert котировки.
|
||
|
||
Требование: схема quotes должна существовать в БД.
|
||
"""
|
||
repo = QuotesRepository(db_session)
|
||
|
||
section = await repo.get_section_by_name("cbr")
|
||
if section is None:
|
||
pytest.skip("Section 'cbr' not found - cannot test upsert")
|
||
|
||
from datetime import datetime
|
||
|
||
quote = await repo.upsert_quote(
|
||
section=section,
|
||
name="TEST_USD",
|
||
last_update_dttm=datetime.now(),
|
||
)
|
||
|
||
assert quote is not None
|
||
assert quote.quote_nm == "TEST_USD"
|
||
assert quote.section_id == section.section_id
|
||
|
||
quote2 = await repo.upsert_quote(
|
||
section=section,
|
||
name="TEST_USD",
|
||
last_update_dttm=datetime.now(),
|
||
)
|
||
|
||
assert quote2.quote_id == quote.quote_id
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_quotes_repository_bulk_upsert_quote_values(self, db_session):
|
||
"""
|
||
Тест массового upsert значений котировок.
|
||
|
||
Требование: схема quotes должна существовать в БД.
|
||
"""
|
||
repo = QuotesRepository(db_session)
|
||
|
||
section = await repo.get_section_by_name("cbr")
|
||
if section is None:
|
||
pytest.skip("Section 'cbr' not found - cannot test bulk upsert")
|
||
|
||
from datetime import datetime
|
||
|
||
quote = await repo.upsert_quote(
|
||
section=section,
|
||
name="TEST_BULK_USD",
|
||
last_update_dttm=datetime.now(),
|
||
)
|
||
|
||
test_rows = [
|
||
{
|
||
"dt": datetime(2025, 1, 15, i, 0, 0),
|
||
"value_base": 75.0 + i,
|
||
}
|
||
for i in range(5)
|
||
]
|
||
|
||
await repo.bulk_upsert_quote_values(quote, test_rows)
|
||
await db_session.commit()
|
||
|
||
result = await db_session.execute(
|
||
"SELECT COUNT(*) FROM quotes.quote_value WHERE quote_id = :quote_id",
|
||
{"quote_id": quote.quote_id},
|
||
)
|
||
count = result.scalar()
|
||
|
||
assert count == 5
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_tenera_pipeline_processes_all_sources(self, db_session):
|
||
"""
|
||
Тест что пайплайн обрабатывает все источники.
|
||
|
||
Требования:
|
||
- SuperTenera API должен быть доступен
|
||
- Все секции должны существовать в БД
|
||
"""
|
||
try:
|
||
async with get_async_tenera_interface() as tenera:
|
||
data = await tenera.get_quotes_data()
|
||
|
||
sources_with_data = []
|
||
for source_name in ["cbr", "investing", "sgx", "tradingeconomics", "bloomberg", "trading_view"]:
|
||
source_data = getattr(data, source_name, None)
|
||
if source_data:
|
||
sources_with_data.append(source_name)
|
||
|
||
assert len(sources_with_data) > 0
|
||
|
||
except Exception as e:
|
||
pytest.skip(f"SuperTenera API not available: {e}")
|