124 lines
4.0 KiB
Python
124 lines
4.0 KiB
Python
"""
|
||
Интеграционные тесты для пайплайна load_opu.
|
||
|
||
ВНИМАНИЕ: Эти тесты требуют работающего Gmap2Brief API и настоящего соединения с БД.
|
||
По умолчанию они исключены из запуска через pytest.mark.skip.
|
||
Для запуска используйте: pytest tests/integration_tests/test_pipeline_load_opu_integration.py --no-skip
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import pytest
|
||
|
||
from dataloader.context import APP_CTX
|
||
from dataloader.interfaces.gmap2_brief.interface import get_gmap2brief_interface
|
||
from dataloader.storage.repositories.opu import OpuRepository
|
||
from dataloader.workers.pipelines.load_opu import load_opu
|
||
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.skip(reason="Requires working Gmap2Brief API - run manually when service is available")
|
||
class TestLoadOpuIntegration:
|
||
"""Интеграционные тесты для пайплайна load_opu."""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_full_opu_pipeline_with_real_api(self, db_session):
|
||
"""
|
||
Тест полного пайплайна OPU с реальным API.
|
||
|
||
Требования:
|
||
- Gmap2Brief API должен быть доступен
|
||
- База данных должна быть настроена
|
||
- Схема OPU должна существовать
|
||
"""
|
||
interface = get_gmap2brief_interface()
|
||
|
||
try:
|
||
job_id = await interface.start_export()
|
||
assert job_id is not None
|
||
assert isinstance(job_id, str)
|
||
|
||
status = await interface.wait_for_completion(job_id, max_wait=300)
|
||
assert status.status == "completed"
|
||
assert status.total_rows > 0
|
||
|
||
except Exception as e:
|
||
pytest.skip(f"Gmap2Brief API not available: {e}")
|
||
|
||
steps = 0
|
||
async for _ in load_opu({}):
|
||
steps += 1
|
||
|
||
assert steps > 0
|
||
|
||
async with APP_CTX.sessionmaker() as session:
|
||
repo = OpuRepository(session)
|
||
|
||
result = await session.execute(
|
||
"SELECT COUNT(*) FROM opu.brief_digital_certificate_opu"
|
||
)
|
||
count = result.scalar()
|
||
|
||
assert count > 0
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_opu_repository_truncate(self, db_session):
|
||
"""
|
||
Тест операции TRUNCATE репозитория OPU.
|
||
|
||
Требование: схема OPU должна существовать в БД.
|
||
"""
|
||
repo = OpuRepository(db_session)
|
||
|
||
await repo.truncate()
|
||
await db_session.commit()
|
||
|
||
result = await db_session.execute(
|
||
"SELECT COUNT(*) FROM opu.brief_digital_certificate_opu"
|
||
)
|
||
count = result.scalar()
|
||
|
||
assert count == 0
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_opu_repository_bulk_insert(self, db_session):
|
||
"""
|
||
Тест массовой вставки данных в репозиторий OPU.
|
||
|
||
Требование: схема OPU должна существовать в БД.
|
||
"""
|
||
repo = OpuRepository(db_session)
|
||
|
||
await repo.truncate()
|
||
await db_session.commit()
|
||
|
||
test_records = [
|
||
{
|
||
"object_id": f"test_{i}",
|
||
"desk_nm": "TEST_DESK",
|
||
"actdate": "2025-01-15",
|
||
"layer_cd": "LAYER1",
|
||
"opu_cd": "OPU1",
|
||
"opu_lvl": 1,
|
||
"opu_prnt_cd": "PARENT",
|
||
"object_unit": "UNIT1",
|
||
"opu_nm": f"Test OPU {i}",
|
||
}
|
||
for i in range(10)
|
||
]
|
||
|
||
inserted = await repo.bulk_insert(test_records)
|
||
await db_session.commit()
|
||
|
||
assert inserted == 10
|
||
|
||
result = await db_session.execute(
|
||
"SELECT COUNT(*) FROM opu.brief_digital_certificate_opu WHERE desk_nm = 'TEST_DESK'"
|
||
)
|
||
count = result.scalar()
|
||
|
||
assert count == 10
|
||
|
||
await repo.truncate()
|
||
await db_session.commit()
|