dataloader/tests/unit/test_pipeline_load_opu.py

254 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Unit тесты для пайплайна load_opu."""
from __future__ import annotations
import tempfile
from datetime import date, datetime
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, Mock, patch
import orjson
import pytest
import zstandard as zstd
from dataloader.interfaces.gmap2_brief.schemas import ExportJobStatus
from dataloader.workers.pipelines.load_opu import (
_convert_record,
_parse_jsonl_from_zst,
load_opu,
)
@pytest.mark.unit
class TestParseJsonlFromZst:
"""Тесты для функции _parse_jsonl_from_zst."""
def test_parses_valid_zst_file_with_small_batch(self):
"""Тест парсинга валидного zst файла с небольшим батчем."""
records = [
{"id": 1, "name": "test1"},
{"id": 2, "name": "test2"},
{"id": 3, "name": "test3"},
]
lines = [orjson.dumps(r) for r in records]
content = b"\n".join(lines)
with tempfile.TemporaryDirectory() as tmpdir:
file_path = Path(tmpdir) / "test.jsonl.zst"
cctx = zstd.ZstdCompressor()
compressed = cctx.compress(content)
with open(file_path, "wb") as f:
f.write(compressed)
batches = list(_parse_jsonl_from_zst(file_path, chunk_size=2))
assert len(batches) == 2
assert batches[0] == [{"id": 1, "name": "test1"}, {"id": 2, "name": "test2"}]
assert batches[1] == [{"id": 3, "name": "test3"}]
def test_parses_empty_file(self):
"""Тест парсинга пустого файла."""
with tempfile.TemporaryDirectory() as tmpdir:
file_path = Path(tmpdir) / "empty.jsonl.zst"
cctx = zstd.ZstdCompressor()
compressed = cctx.compress(b"")
with open(file_path, "wb") as f:
f.write(compressed)
batches = list(_parse_jsonl_from_zst(file_path, chunk_size=100))
assert len(batches) == 0
def test_skips_empty_lines(self):
"""Тест пропуска пустых строк."""
records = [
{"id": 1},
{"id": 2},
]
lines = [orjson.dumps(records[0]), b"", b" ", orjson.dumps(records[1])]
content = b"\n".join(lines)
with tempfile.TemporaryDirectory() as tmpdir:
file_path = Path(tmpdir) / "test.jsonl.zst"
cctx = zstd.ZstdCompressor()
compressed = cctx.compress(content)
with open(file_path, "wb") as f:
f.write(compressed)
batches = list(_parse_jsonl_from_zst(file_path, chunk_size=10))
assert len(batches) == 1
assert batches[0] == [{"id": 1}, {"id": 2}]
@patch("dataloader.workers.pipelines.load_opu.APP_CTX")
def test_handles_invalid_json_gracefully(self, mock_ctx):
"""Тест обработки невалидного JSON."""
mock_logger = MagicMock()
mock_ctx.logger = mock_logger
lines = [
orjson.dumps({"id": 1}),
b"{invalid json}",
orjson.dumps({"id": 2}),
]
content = b"\n".join(lines)
with tempfile.TemporaryDirectory() as tmpdir:
file_path = Path(tmpdir) / "test.jsonl.zst"
cctx = zstd.ZstdCompressor()
compressed = cctx.compress(content)
with open(file_path, "wb") as f:
f.write(compressed)
batches = list(_parse_jsonl_from_zst(file_path, chunk_size=10))
assert len(batches) == 1
assert batches[0] == [{"id": 1}, {"id": 2}]
mock_logger.warning.assert_called()
@pytest.mark.unit
class TestConvertRecord:
"""Тесты для функции _convert_record."""
def test_converts_actdate_string_to_date(self):
"""Тест конвертации actdate из строки в date."""
raw = {
"id": 1,
"actdate": "2025-01-15",
"name": "test",
}
result = _convert_record(raw)
assert result["id"] == 1
assert result["actdate"] == date(2025, 1, 15)
assert result["name"] == "test"
def test_converts_wf_load_dttm_string_to_datetime(self):
"""Тест конвертации wf_load_dttm из строки в datetime."""
raw = {
"id": 1,
"wf_load_dttm": "2025-01-15T12:30:45",
}
result = _convert_record(raw)
assert result["id"] == 1
assert result["wf_load_dttm"] == datetime(2025, 1, 15, 12, 30, 45)
def test_keeps_non_date_fields_unchanged(self):
"""Тест сохранения полей без конвертации."""
raw = {
"id": 1,
"name": "test",
"value": 123.45,
}
result = _convert_record(raw)
assert result == raw
def test_handles_already_converted_dates(self):
"""Тест обработки уже сконвертированных дат."""
actdate_obj = date(2025, 1, 15)
wf_load_dttm_obj = datetime(2025, 1, 15, 12, 30, 45)
raw = {
"id": 1,
"actdate": actdate_obj,
"wf_load_dttm": wf_load_dttm_obj,
}
result = _convert_record(raw)
assert result["actdate"] == actdate_obj
assert result["wf_load_dttm"] == wf_load_dttm_obj
@pytest.mark.unit
class TestLoadOpuPipeline:
"""Тесты для пайплайна load_opu."""
@pytest.mark.asyncio
async def test_full_pipeline_success(self):
"""Тест успешного выполнения полного пайплайна."""
mock_interface = AsyncMock()
mock_interface.start_export = AsyncMock(return_value="job123")
mock_interface.wait_for_completion = AsyncMock(
return_value=ExportJobStatus(
job_id="job123",
status="completed",
total_rows=100,
)
)
with tempfile.TemporaryDirectory() as tmpdir:
archive_path = Path(tmpdir) / "test.jsonl.zst"
records = [
{"id": i, "actdate": "2025-01-15", "name": f"test{i}"} for i in range(10)
]
lines = [orjson.dumps(r) for r in records]
content = b"\n".join(lines)
cctx = zstd.ZstdCompressor()
compressed = cctx.compress(content)
with open(archive_path, "wb") as f:
f.write(compressed)
async def mock_download(job_id: str, output_path: Path):
with open(archive_path, "rb") as src:
with open(output_path, "wb") as dst:
dst.write(src.read())
mock_interface.download_export = AsyncMock(side_effect=mock_download)
mock_session = AsyncMock()
mock_sessionmaker = MagicMock()
mock_sessionmaker.return_value.__aenter__ = AsyncMock(
return_value=mock_session
)
mock_sessionmaker.return_value.__aexit__ = AsyncMock()
mock_repo = AsyncMock()
mock_repo.truncate = AsyncMock()
mock_repo.bulk_insert = AsyncMock(return_value=10)
mock_app_ctx = MagicMock()
mock_app_ctx.logger = MagicMock()
mock_app_ctx.sessionmaker = mock_sessionmaker
with (
patch(
"dataloader.workers.pipelines.load_opu.get_gmap2brief_interface",
return_value=mock_interface,
),
patch(
"dataloader.workers.pipelines.load_opu.OpuRepository",
return_value=mock_repo,
),
patch("dataloader.workers.pipelines.load_opu.APP_CTX", mock_app_ctx),
):
steps = []
async for _ in load_opu({}):
steps.append("step")
assert len(steps) >= 4
mock_interface.start_export.assert_called_once()
mock_interface.wait_for_completion.assert_called_once_with("job123")
mock_interface.download_export.assert_called_once()
mock_repo.truncate.assert_called_once()
mock_repo.bulk_insert.assert_called()