""" Интеграционные тесты для пайплайна load_opu. ВНИМАНИЕ: Эти тесты требуют работающего Gmap2Brief API и настоящего соединения с БД. По умолчанию они исключены из запуска через pytest.mark.skip. Для запуска используйте: pytest tests/integration_tests/test_pipeline_load_opu_integration.py --no-skip """ from __future__ import annotations import pytest from dataloader.context import APP_CTX from dataloader.interfaces.gmap2_brief.interface import get_gmap2brief_interface from dataloader.storage.repositories.opu import OpuRepository from dataloader.workers.pipelines.load_opu import load_opu @pytest.mark.integration @pytest.mark.skip(reason="Requires working Gmap2Brief API - run manually when service is available") class TestLoadOpuIntegration: """Интеграционные тесты для пайплайна load_opu.""" @pytest.mark.asyncio async def test_full_opu_pipeline_with_real_api(self, db_session): """ Тест полного пайплайна OPU с реальным API. Требования: - Gmap2Brief API должен быть доступен - База данных должна быть настроена - Схема OPU должна существовать """ interface = get_gmap2brief_interface() try: job_id = await interface.start_export() assert job_id is not None assert isinstance(job_id, str) status = await interface.wait_for_completion(job_id, max_wait=300) assert status.status == "completed" assert status.total_rows > 0 except Exception as e: pytest.skip(f"Gmap2Brief API not available: {e}") steps = 0 async for _ in load_opu({}): steps += 1 assert steps > 0 async with APP_CTX.sessionmaker() as session: repo = OpuRepository(session) result = await session.execute( "SELECT COUNT(*) FROM opu.brief_digital_certificate_opu" ) count = result.scalar() assert count > 0 @pytest.mark.asyncio async def test_opu_repository_truncate(self, db_session): """ Тест операции TRUNCATE репозитория OPU. Требование: схема OPU должна существовать в БД. """ repo = OpuRepository(db_session) await repo.truncate() await db_session.commit() result = await db_session.execute( "SELECT COUNT(*) FROM opu.brief_digital_certificate_opu" ) count = result.scalar() assert count == 0 @pytest.mark.asyncio async def test_opu_repository_bulk_insert(self, db_session): """ Тест массовой вставки данных в репозиторий OPU. Требование: схема OPU должна существовать в БД. """ repo = OpuRepository(db_session) await repo.truncate() await db_session.commit() test_records = [ { "object_id": f"test_{i}", "desk_nm": "TEST_DESK", "actdate": "2025-01-15", "layer_cd": "LAYER1", "opu_cd": "OPU1", "opu_lvl": 1, "opu_prnt_cd": "PARENT", "object_unit": "UNIT1", "opu_nm": f"Test OPU {i}", } for i in range(10) ] inserted = await repo.bulk_insert(test_records) await db_session.commit() assert inserted == 10 result = await db_session.execute( "SELECT COUNT(*) FROM opu.brief_digital_certificate_opu WHERE desk_nm = 'TEST_DESK'" ) count = result.scalar() assert count == 10 await repo.truncate() await db_session.commit()