dataloader/tests/integration_tests/test_pipeline_load_opu_inte...

124 lines
4.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Интеграционные тесты для пайплайна load_opu.
ВНИМАНИЕ: Эти тесты требуют работающего Gmap2Brief API и настоящего соединения с БД.
По умолчанию они исключены из запуска через pytest.mark.skip.
Для запуска используйте: pytest tests/integration_tests/test_pipeline_load_opu_integration.py --no-skip
"""
from __future__ import annotations
import pytest
from dataloader.context import APP_CTX
from dataloader.interfaces.gmap2_brief.interface import get_gmap2brief_interface
from dataloader.storage.repositories.opu import OpuRepository
from dataloader.workers.pipelines.load_opu import load_opu
@pytest.mark.integration
@pytest.mark.skip(reason="Requires working Gmap2Brief API - run manually when service is available")
class TestLoadOpuIntegration:
"""Интеграционные тесты для пайплайна load_opu."""
@pytest.mark.asyncio
async def test_full_opu_pipeline_with_real_api(self, db_session):
"""
Тест полного пайплайна OPU с реальным API.
Требования:
- Gmap2Brief API должен быть доступен
- База данных должна быть настроена
- Схема OPU должна существовать
"""
interface = get_gmap2brief_interface()
try:
job_id = await interface.start_export()
assert job_id is not None
assert isinstance(job_id, str)
status = await interface.wait_for_completion(job_id, max_wait=300)
assert status.status == "completed"
assert status.total_rows > 0
except Exception as e:
pytest.skip(f"Gmap2Brief API not available: {e}")
steps = 0
async for _ in load_opu({}):
steps += 1
assert steps > 0
async with APP_CTX.sessionmaker() as session:
repo = OpuRepository(session)
result = await session.execute(
"SELECT COUNT(*) FROM opu.brief_digital_certificate_opu"
)
count = result.scalar()
assert count > 0
@pytest.mark.asyncio
async def test_opu_repository_truncate(self, db_session):
"""
Тест операции TRUNCATE репозитория OPU.
Требование: схема OPU должна существовать в БД.
"""
repo = OpuRepository(db_session)
await repo.truncate()
await db_session.commit()
result = await db_session.execute(
"SELECT COUNT(*) FROM opu.brief_digital_certificate_opu"
)
count = result.scalar()
assert count == 0
@pytest.mark.asyncio
async def test_opu_repository_bulk_insert(self, db_session):
"""
Тест массовой вставки данных в репозиторий OPU.
Требование: схема OPU должна существовать в БД.
"""
repo = OpuRepository(db_session)
await repo.truncate()
await db_session.commit()
test_records = [
{
"object_id": f"test_{i}",
"desk_nm": "TEST_DESK",
"actdate": "2025-01-15",
"layer_cd": "LAYER1",
"opu_cd": "OPU1",
"opu_lvl": 1,
"opu_prnt_cd": "PARENT",
"object_unit": "UNIT1",
"opu_nm": f"Test OPU {i}",
}
for i in range(10)
]
inserted = await repo.bulk_insert(test_records)
await db_session.commit()
assert inserted == 10
result = await db_session.execute(
"SELECT COUNT(*) FROM opu.brief_digital_certificate_opu WHERE desk_nm = 'TEST_DESK'"
)
count = result.scalar()
assert count == 10
await repo.truncate()
await db_session.commit()