ruwiki-test/tests/test_integration.py

422 lines
15 KiB
Python

import asyncio
import tempfile
from pathlib import Path
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import pytest
import pytest_asyncio
from src.dependency_injection import DependencyContainer
from src.models.article_dto import ArticleStatus
from src.sources import FileSource
from src.services import ArticleRepository, DatabaseService
class TestFileSourceIntegration:
@pytest.mark.asyncio
async def test_read_urls_from_file(self, temp_input_file):
source = FileSource(temp_input_file)
commands = []
async for command in source.read_urls():
commands.append(command)
assert len(commands) >= 3
for command in commands:
assert command.url.startswith("https://ru.ruwiki.ru/wiki/")
assert command.force_reprocess is False
@pytest.mark.asyncio
async def test_count_urls(self, temp_input_file):
source = FileSource(temp_input_file)
count = await source.count_urls()
assert count >= 3
@pytest.mark.asyncio
async def test_file_not_found(self):
source = FileSource("nonexistent.txt")
with pytest.raises(FileNotFoundError):
async for _ in source.read_urls():
pass
class TestDatabaseIntegration:
@pytest_asyncio.fixture
async def clean_database(self, database_service: DatabaseService):
yield database_service
async def test_database_initialization(self, clean_database: DatabaseService):
health = await clean_database.health_check()
assert health is True
async def test_database_connection(self, clean_database: DatabaseService):
async with await clean_database.get_connection() as conn:
cursor = await conn.execute("SELECT 1")
result = await cursor.fetchone()
assert result[0] == 1
class TestRepositoryIntegration:
async def test_create_and_retrieve_article(self, repository: ArticleRepository):
article = await repository.create_article(
url="https://ru.ruwiki.ru/wiki/Test",
title="Test Article",
raw_text="Test content",
)
assert article.id is not None
assert article.url == "https://ru.ruwiki.ru/wiki/Test"
assert article.title == "Test Article"
assert article.status == ArticleStatus.PENDING
retrieved = await repository.get_by_id(article.id)
assert retrieved is not None
assert retrieved.url == article.url
assert retrieved.title == article.title
retrieved_by_url = await repository.get_by_url(article.url)
assert retrieved_by_url is not None
assert retrieved_by_url.id == article.id
async def test_update_article(self, repository: ArticleRepository):
article = await repository.create_article(
url="https://ru.ruwiki.ru/wiki/Test",
title="Test Article",
raw_text="Test content",
)
article.status = ArticleStatus.SIMPLIFIED
article.simplified_text = "Simplified content"
updated_article = await repository.update_article(article)
assert updated_article.status == ArticleStatus.SIMPLIFIED
assert updated_article.simplified_text == "Simplified content"
assert updated_article.updated_at is not None
retrieved = await repository.get_by_id(article.id)
assert retrieved.status == ArticleStatus.SIMPLIFIED
assert retrieved.simplified_text == "Simplified content"
async def test_get_articles_by_status(self, repository: ArticleRepository):
article1 = await repository.create_article(
url="https://ru.ruwiki.ru/wiki/Test1",
title="Test 1",
raw_text="Content 1",
)
article2 = await repository.create_article(
url="https://ru.ruwiki.ru/wiki/Test2",
title="Test 2",
raw_text="Content 2",
)
article2.status = ArticleStatus.SIMPLIFIED
await repository.update_article(article2)
pending_articles = await repository.get_articles_by_status(ArticleStatus.PENDING)
assert len(pending_articles) == 1
assert pending_articles[0].id == article1.id
simplified_articles = await repository.get_articles_by_status(ArticleStatus.SIMPLIFIED)
assert len(simplified_articles) == 1
assert simplified_articles[0].id == article2.id
async def test_count_by_status(self, repository: ArticleRepository):
count = await repository.count_by_status(ArticleStatus.PENDING)
assert count == 0
await repository.create_article(
url="https://ru.ruwiki.ru/wiki/Test1",
title="Test 1",
raw_text="Content 1",
)
await repository.create_article(
url="https://ru.ruwiki.ru/wiki/Test2",
title="Test 2",
raw_text="Content 2",
)
pending_count = await repository.count_by_status(ArticleStatus.PENDING)
assert pending_count == 2
simplified_count = await repository.count_by_status(ArticleStatus.SIMPLIFIED)
assert simplified_count == 0
async def test_duplicate_url_prevention(self, repository: ArticleRepository):
await repository.create_article(
url="https://ru.ruwiki.ru/wiki/Test",
title="Test Article",
raw_text="Test content",
)
with pytest.raises(ValueError, match="уже существует"):
await repository.create_article(
url="https://ru.ruwiki.ru/wiki/Test",
title="Duplicate Article",
raw_text="Different content",
)
async def test_get_all_articles_pagination(self, repository: ArticleRepository):
urls = [f"https://ru.ruwiki.ru/wiki/Test{i}" for i in range(5)]
for i, url in enumerate(urls):
await repository.create_article(
url=url,
title=f"Test {i}",
raw_text=f"Content {i}",
)
articles = await repository.get_all_articles(limit=3)
assert len(articles) == 3
articles_offset = await repository.get_all_articles(limit=2, offset=2)
assert len(articles_offset) == 2
first_two = await repository.get_all_articles(limit=2, offset=0)
assert articles_offset[0].id != first_two[0].id
assert articles_offset[0].id != first_two[1].id
async def test_delete_article(self, repository: ArticleRepository):
article = await repository.create_article(
url="https://ru.ruwiki.ru/wiki/Test",
title="Test Article",
raw_text="Test content",
)
deleted = await repository.delete_article(article.id)
assert deleted is True
retrieved = await repository.get_by_id(article.id)
assert retrieved is None
deleted_again = await repository.delete_article(article.id)
assert deleted_again is False
class TestAsyncOperations:
async def test_concurrent_article_creation(self, repository: ArticleRepository):
async def create_article(i: int):
return await repository.create_article(
url=f"https://ru.ruwiki.ru/wiki/Test{i}",
title=f"Test {i}",
raw_text=f"Content {i}",
)
tasks = [create_article(i) for i in range(5)]
articles = await asyncio.gather(*tasks)
assert len(articles) == 5
ids = [article.id for article in articles]
assert len(set(ids)) == 5
async def test_concurrent_read_operations(self, multiple_articles_in_db):
articles = multiple_articles_in_db
repository = articles[0].__class__.__module__
async def read_article(article_id: int):
pass
class TestSystemIntegration:
@pytest.mark.asyncio
async def test_dependency_container_initialization(self, test_config):
container = DependencyContainer(test_config)
try:
await container.initialize()
db_service = container.get_database_service()
repository = container.get_repository()
write_queue = container.get_write_queue()
ruwiki_adapter = container.get_ruwiki_adapter()
llm_adapter = container.get_llm_adapter()
simplify_service = container.get_simplify_service()
assert db_service is not None
assert repository is not None
assert write_queue is not None
assert ruwiki_adapter is not None
assert llm_adapter is not None
assert simplify_service is not None
checks = await container.health_check()
assert "database" in checks
assert "write_queue" in checks
finally:
await container.cleanup()
@pytest.mark.asyncio
async def test_runner_with_mocked_adapters(self, test_config, temp_input_file):
container = DependencyContainer(test_config)
try:
await container.initialize()
with (
patch.object(container, "get_ruwiki_adapter") as mock_ruwiki,
patch.object(container, "get_llm_adapter") as mock_llm,
):
mock_ruwiki_instance = AsyncMock()
mock_ruwiki.return_value = mock_ruwiki_instance
from src.adapters.ruwiki import WikiPageInfo
mock_ruwiki_instance.fetch_page_cleaned.return_value = WikiPageInfo(
title="Тест",
content="Тестовый контент",
)
mock_llm_instance = AsyncMock()
mock_llm.return_value = mock_llm_instance
mock_llm_instance.simplify_text.return_value = ("Упрощённый текст", 100, 50)
mock_llm_instance.count_tokens.return_value = 100
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write("### role: user\n{wiki_source_text}")
test_config.prompt_template_path = f.name
try:
runner = container.create_runner(max_workers=2)
stats = await runner.run_from_file(
input_file=temp_input_file,
max_articles=2,
)
assert stats.total_processed >= 1
assert stats.successful >= 0
finally:
Path(test_config.prompt_template_path).unlink(missing_ok=True)
finally:
await container.cleanup()
@pytest.mark.asyncio
async def test_error_handling_in_runner(self, test_config, temp_input_file):
container = DependencyContainer(test_config)
try:
await container.initialize()
with patch.object(container, "get_ruwiki_adapter") as mock_ruwiki:
mock_ruwiki_instance = AsyncMock()
mock_ruwiki.return_value = mock_ruwiki_instance
from src.adapters import WikiPageNotFoundError
mock_ruwiki_instance.fetch_page_cleaned.side_effect = WikiPageNotFoundError(
"Страница не найдена"
)
runner = container.create_runner(max_workers=1)
stats = await runner.run_from_file(
input_file=temp_input_file,
max_articles=1,
)
assert stats.total_processed >= 1
assert stats.failed >= 1
assert stats.success_rate < 100.0
finally:
await container.cleanup()
@pytest.mark.asyncio
async def test_concurrent_processing(self, test_config, temp_input_file):
container = DependencyContainer(test_config)
try:
await container.initialize()
with (
patch.object(container, "get_ruwiki_adapter") as mock_ruwiki,
patch.object(container, "get_llm_adapter") as mock_llm,
):
async def delayed_fetch(*args, **kwargs):
await asyncio.sleep(0.1)
from src.adapters.ruwiki import WikiPageInfo
return WikiPageInfo(title="Тест", content="Контент")
async def delayed_simplify(*args, **kwargs):
await asyncio.sleep(0.1)
return ("Упрощённый", 100, 50)
mock_ruwiki_instance = AsyncMock()
mock_ruwiki.return_value = mock_ruwiki_instance
mock_ruwiki_instance.fetch_page_cleaned.side_effect = delayed_fetch
mock_llm_instance = AsyncMock()
mock_llm.return_value = mock_llm_instance
mock_llm_instance.simplify_text.side_effect = delayed_simplify
mock_llm_instance.count_tokens.return_value = 100
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write("### role: user\n{wiki_source_text}")
test_config.prompt_template_path = f.name
try:
import time
start_time = time.time()
runner = container.create_runner(max_workers=3)
stats = await runner.run_from_file(
input_file=temp_input_file,
max_articles=3,
)
elapsed_time = time.time() - start_time
assert elapsed_time < 2.0
assert stats.total_processed >= 1
finally:
Path(test_config.prompt_template_path).unlink(missing_ok=True)
finally:
await container.cleanup()
@pytest.mark.asyncio
async def test_health_check_integration(self, test_config):
container = DependencyContainer(test_config)
try:
await container.initialize()
with (
patch.object(container, "get_ruwiki_adapter") as mock_ruwiki,
patch.object(container, "get_llm_adapter") as mock_llm,
):
mock_ruwiki_instance = AsyncMock()
mock_ruwiki.return_value = mock_ruwiki_instance
mock_ruwiki_instance.health_check.return_value = True
mock_llm_instance = AsyncMock()
mock_llm.return_value = mock_llm_instance
mock_llm_instance.health_check.return_value = True
checks = await container.health_check()
assert checks["database"] is True
assert checks["write_queue"] is True
assert checks["ruwiki"] is True
assert checks["llm"] is True
finally:
await container.cleanup()