"""Unit тесты для пайплайна load_opu.""" from __future__ import annotations import tempfile from datetime import date, datetime from pathlib import Path from unittest.mock import AsyncMock, MagicMock, Mock, patch import orjson import pytest import zstandard as zstd from dataloader.interfaces.gmap2_brief.schemas import ExportJobStatus from dataloader.workers.pipelines.load_opu import ( _convert_record, _parse_jsonl_from_zst, load_opu, ) @pytest.mark.unit class TestParseJsonlFromZst: """Тесты для функции _parse_jsonl_from_zst.""" def test_parses_valid_zst_file_with_small_batch(self): """Тест парсинга валидного zst файла с небольшим батчем.""" records = [ {"id": 1, "name": "test1"}, {"id": 2, "name": "test2"}, {"id": 3, "name": "test3"}, ] lines = [orjson.dumps(r) for r in records] content = b"\n".join(lines) with tempfile.TemporaryDirectory() as tmpdir: file_path = Path(tmpdir) / "test.jsonl.zst" cctx = zstd.ZstdCompressor() compressed = cctx.compress(content) with open(file_path, "wb") as f: f.write(compressed) batches = list(_parse_jsonl_from_zst(file_path, chunk_size=2)) assert len(batches) == 2 assert batches[0] == [{"id": 1, "name": "test1"}, {"id": 2, "name": "test2"}] assert batches[1] == [{"id": 3, "name": "test3"}] def test_parses_empty_file(self): """Тест парсинга пустого файла.""" with tempfile.TemporaryDirectory() as tmpdir: file_path = Path(tmpdir) / "empty.jsonl.zst" cctx = zstd.ZstdCompressor() compressed = cctx.compress(b"") with open(file_path, "wb") as f: f.write(compressed) batches = list(_parse_jsonl_from_zst(file_path, chunk_size=100)) assert len(batches) == 0 def test_skips_empty_lines(self): """Тест пропуска пустых строк.""" records = [ {"id": 1}, {"id": 2}, ] lines = [orjson.dumps(records[0]), b"", b" ", orjson.dumps(records[1])] content = b"\n".join(lines) with tempfile.TemporaryDirectory() as tmpdir: file_path = Path(tmpdir) / "test.jsonl.zst" cctx = zstd.ZstdCompressor() compressed = cctx.compress(content) with open(file_path, "wb") as f: f.write(compressed) batches = list(_parse_jsonl_from_zst(file_path, chunk_size=10)) assert len(batches) == 1 assert batches[0] == [{"id": 1}, {"id": 2}] @patch("dataloader.workers.pipelines.load_opu.APP_CTX") def test_handles_invalid_json_gracefully(self, mock_ctx): """Тест обработки невалидного JSON.""" mock_logger = MagicMock() mock_ctx.logger = mock_logger lines = [ orjson.dumps({"id": 1}), b"{invalid json}", orjson.dumps({"id": 2}), ] content = b"\n".join(lines) with tempfile.TemporaryDirectory() as tmpdir: file_path = Path(tmpdir) / "test.jsonl.zst" cctx = zstd.ZstdCompressor() compressed = cctx.compress(content) with open(file_path, "wb") as f: f.write(compressed) batches = list(_parse_jsonl_from_zst(file_path, chunk_size=10)) assert len(batches) == 1 assert batches[0] == [{"id": 1}, {"id": 2}] mock_logger.warning.assert_called() @pytest.mark.unit class TestConvertRecord: """Тесты для функции _convert_record.""" def test_converts_actdate_string_to_date(self): """Тест конвертации actdate из строки в date.""" raw = { "id": 1, "actdate": "2025-01-15", "name": "test", } result = _convert_record(raw) assert result["id"] == 1 assert result["actdate"] == date(2025, 1, 15) assert result["name"] == "test" def test_converts_wf_load_dttm_string_to_datetime(self): """Тест конвертации wf_load_dttm из строки в datetime.""" raw = { "id": 1, "wf_load_dttm": "2025-01-15T12:30:45", } result = _convert_record(raw) assert result["id"] == 1 assert result["wf_load_dttm"] == datetime(2025, 1, 15, 12, 30, 45) def test_keeps_non_date_fields_unchanged(self): """Тест сохранения полей без конвертации.""" raw = { "id": 1, "name": "test", "value": 123.45, } result = _convert_record(raw) assert result == raw def test_handles_already_converted_dates(self): """Тест обработки уже сконвертированных дат.""" actdate_obj = date(2025, 1, 15) wf_load_dttm_obj = datetime(2025, 1, 15, 12, 30, 45) raw = { "id": 1, "actdate": actdate_obj, "wf_load_dttm": wf_load_dttm_obj, } result = _convert_record(raw) assert result["actdate"] == actdate_obj assert result["wf_load_dttm"] == wf_load_dttm_obj @pytest.mark.unit class TestLoadOpuPipeline: """Тесты для пайплайна load_opu.""" @pytest.mark.asyncio async def test_full_pipeline_success(self): """Тест успешного выполнения полного пайплайна.""" mock_interface = AsyncMock() mock_interface.start_export = AsyncMock(return_value="job123") mock_interface.wait_for_completion = AsyncMock( return_value=ExportJobStatus( job_id="job123", status="completed", total_rows=100, ) ) with tempfile.TemporaryDirectory() as tmpdir: archive_path = Path(tmpdir) / "test.jsonl.zst" records = [ {"id": i, "actdate": "2025-01-15", "name": f"test{i}"} for i in range(10) ] lines = [orjson.dumps(r) for r in records] content = b"\n".join(lines) cctx = zstd.ZstdCompressor() compressed = cctx.compress(content) with open(archive_path, "wb") as f: f.write(compressed) async def mock_download(job_id: str, output_path: Path): with open(archive_path, "rb") as src: with open(output_path, "wb") as dst: dst.write(src.read()) mock_interface.download_export = AsyncMock(side_effect=mock_download) mock_session = AsyncMock() mock_sessionmaker = MagicMock() mock_sessionmaker.return_value.__aenter__ = AsyncMock( return_value=mock_session ) mock_sessionmaker.return_value.__aexit__ = AsyncMock() mock_repo = AsyncMock() mock_repo.truncate = AsyncMock() mock_repo.bulk_insert = AsyncMock(return_value=10) mock_app_ctx = MagicMock() mock_app_ctx.logger = MagicMock() mock_app_ctx.sessionmaker = mock_sessionmaker with ( patch( "dataloader.workers.pipelines.load_opu.get_gmap2brief_interface", return_value=mock_interface, ), patch( "dataloader.workers.pipelines.load_opu.OpuRepository", return_value=mock_repo, ), patch("dataloader.workers.pipelines.load_opu.APP_CTX", mock_app_ctx), ): steps = [] async for _ in load_opu({}): steps.append("step") assert len(steps) >= 4 mock_interface.start_export.assert_called_once() mock_interface.wait_for_completion.assert_called_once_with("job123") mock_interface.download_export.assert_called_once() mock_repo.truncate.assert_called_once() mock_repo.bulk_insert.assert_called()