This commit is contained in:
itqop 2025-07-11 23:55:49 +03:00
parent c41a2907b8
commit 50439b84bb
13 changed files with 152 additions and 200 deletions

View File

@ -2,7 +2,7 @@ anyio>=4.2.0,<5.0.0
aiohttp>=3.9.0,<4.0.0
aiosqlite>=0.19.0,<0.20.0
sqlmodel>=0.0.14,<0.0.15
sqlalchemy[asyncio]>=2.0.0,<3.0.0
openai>=1.13.0,<2.0.0
tiktoken>=0.5.2,<0.6.0

View File

@ -223,9 +223,9 @@ def list_articles(
repository = container.get_repository()
if status:
from .models import ProcessingStatus
from .models.article_dto import ArticleStatus
status_enum = ProcessingStatus(status)
status_enum = ArticleStatus(status)
articles = await repository.get_articles_by_status(status_enum, limit)
else:
articles = await repository.get_all_articles(limit)
@ -238,8 +238,7 @@ def list_articles(
"title": article.title,
"status": article.status.value,
"created_at": article.created_at.isoformat(),
"token_count_raw": article.token_count_raw,
"token_count_simplified": article.token_count_simplified,
"simplified_text": article.simplified_text,
}
for article in articles
]
@ -249,20 +248,14 @@ def list_articles(
click.echo("Статьи не найдены")
return
click.echo(f"{'ID':<5} {'Статус':<12} {'Название':<50} {'Токены (исх/упр)':<15}")
click.echo("-" * 87)
click.echo(f"{'ID':<5} {'Статус':<12} {'Название':<50}")
click.echo("-" * 72)
for article in articles:
tokens_info = ""
if article.token_count_raw and article.token_count_simplified:
tokens_info = f"{article.token_count_raw}/{article.token_count_simplified}"
elif article.token_count_raw:
tokens_info = f"{article.token_count_raw}/-"
title = article.title[:47] + "..." if len(article.title) > 50 else article.title
click.echo(
f"{article.id:<5} {article.status.value:<12} {title:<50} {tokens_info:<15}"
f"{article.id:<5} {article.status.value:<12} {title:<50}"
)
except Exception as e:

View File

@ -45,9 +45,6 @@ class DependencyContainer:
if self._write_queue:
await self._write_queue.stop()
if self._database_service:
self._database_service.close()
self._initialized = False
logger.info("Ресурсы очищены")
@ -117,7 +114,7 @@ class DependencyContainer:
checks["database"] = await db_service.health_check()
except Exception:
checks["database"] = False
return checks
try:
write_queue = self.get_write_queue()
checks["write_queue"] = (
@ -141,7 +138,6 @@ class DependencyContainer:
return checks
@lru_cache(maxsize=1)
def get_container(config: AppConfig | None = None) -> DependencyContainer:
if config is None:
config = AppConfig()

View File

@ -1,15 +1,12 @@
from .article import Article, ArticleCreate, ArticleRead, ProcessingStatus
from .article_dto import ArticleDTO, ArticleStatus
from .commands import ProcessingResult, ProcessingStats, SimplifyCommand
from .config import AppConfig
from .constants import *
__all__ = [
"AppConfig",
"Article",
"ArticleCreate",
"ArticleRead",
"ArticleDTO",
"ArticleStatus",
"ProcessingResult",
"ProcessingStats",
"ProcessingStatus",
"SimplifyCommand",
]

View File

@ -1,81 +0,0 @@
from __future__ import annotations
from datetime import datetime, timezone
from enum import Enum
from sqlmodel import Field, SQLModel
class ProcessingStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class Article(SQLModel, table=True):
__tablename__ = "articles"
id: int | None = Field(default=None, primary_key=True)
url: str = Field(index=True, unique=True, max_length=500)
title: str = Field(max_length=300)
raw_text: str = Field(description="Исходный wiki-текст")
simplified_text: str | None = Field(
default=None,
description="Упрощённый текст для школьников",
)
status: ProcessingStatus = Field(default=ProcessingStatus.PENDING)
error_message: str | None = Field(default=None, max_length=1000)
token_count_raw: int | None = Field(
default=None, description="Количество токенов в исходном тексте"
)
token_count_simplified: int | None = Field(
default=None,
description="Количество токенов в упрощённом тексте",
)
processing_time_seconds: float | None = Field(default=None)
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime | None = Field(default=None)
def mark_processing(self) -> None:
self.status = ProcessingStatus.PROCESSING
self.updated_at = datetime.now(timezone.utc)
def mark_completed(
self,
simplified_text: str,
token_count_raw: int,
token_count_simplified: int,
processing_time: float,
) -> None:
self.simplified_text = simplified_text
self.token_count_raw = token_count_raw
self.token_count_simplified = token_count_simplified
self.processing_time_seconds = processing_time
self.status = ProcessingStatus.COMPLETED
self.error_message = None
self.updated_at = datetime.now(timezone.utc)
def mark_failed(self, error_message: str) -> None:
self.status = ProcessingStatus.FAILED
self.error_message = error_message[:1000]
self.updated_at = datetime.now(timezone.utc)
class ArticleCreate(SQLModel):
url: str
title: str
raw_text: str
class ArticleRead(SQLModel):
id: int
url: str
title: str
raw_text: str
simplified_text: str | None
status: ProcessingStatus
token_count_raw: int | None
token_count_simplified: int | None
created_at: datetime

26
src/models/article_dto.py Normal file
View File

@ -0,0 +1,26 @@
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class ArticleStatus(Enum):
PENDING = "pending"
SIMPLIFIED = "simplified"
FAILED = "failed"
@dataclass
class ArticleDTO:
"""
DTO для Article без зависимостей от SQLModel.
Используется в runtime коде для передачи данных между слоями.
"""
url: str
title: str
raw_text: str
status: ArticleStatus
created_at: datetime
id: Optional[int] = None
simplified_text: Optional[str] = None
updated_at: Optional[datetime] = None

View File

@ -0,0 +1,6 @@
LLM_MAX_INPUT_TOKENS = 4096
MAX_TOKEN_LIMIT_WITH_BUFFER = 3800
ARTICLE_NAME_INDEX = 0
MIN_WIKI_PATH_PARTS = 2
WIKI_PATH_INDEX = 1
WRITE_QUEUE_BATCH_SIZE = 10

View File

@ -1,10 +1,7 @@
"""Сервис для управления базой данных."""
from pathlib import Path
import aiosqlite
import structlog
from sqlmodel import SQLModel, create_engine
from ..models import AppConfig
@ -16,18 +13,40 @@ class DatabaseService:
self.config = config
self.logger = structlog.get_logger().bind(service="database")
self._sync_engine = create_engine(
config.sync_db_url,
echo=False,
connect_args={"check_same_thread": False},
)
async def initialize_database(self) -> None:
db_path = Path(self.config.db_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
self.logger.info("Создание схемы базы данных", db_path=self.config.db_path)
SQLModel.metadata.create_all(self._sync_engine)
# Создаём таблицы напрямую через DDL без SQLAlchemy
async with aiosqlite.connect(self.config.db_path) as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL UNIQUE,
title TEXT NOT NULL,
raw_text TEXT NOT NULL,
simplified_text TEXT,
status TEXT NOT NULL DEFAULT 'pending',
error_message TEXT,
token_count_raw INTEGER,
token_count_simplified INTEGER,
processing_time_seconds REAL,
created_at TEXT NOT NULL,
updated_at TEXT
)
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_articles_url ON articles(url)
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_articles_status ON articles(status)
""")
await conn.commit()
await self._configure_sqlite()
@ -47,6 +66,7 @@ class DatabaseService:
self.logger.info("SQLite настроен для оптимальной производительности")
async def get_connection(self) -> aiosqlite.Connection:
self.logger.info("Открытие соединения с базой данных")
return await aiosqlite.connect(
self.config.db_path,
timeout=30.0,
@ -54,12 +74,18 @@ class DatabaseService:
async def health_check(self) -> bool:
try:
async with self.get_connection() as conn:
await conn.execute("SELECT 1")
self.logger.info("Начинаем health_check...")
self.logger.info("Создаём соединение напрямую...")
# Создаём соединение напрямую, не переиспользуя объект
async with aiosqlite.connect(self.config.db_path, timeout=30.0) as connection:
self.logger.info("Вошли в async context manager")
self.logger.info("Выполняем SELECT 1...")
await connection.execute("SELECT 1")
self.logger.info("SELECT 1 выполнен успешно")
return True
except Exception as e:
self.logger.error("Database health check failed", error=str(e))
import traceback
self.logger.error("Traceback", traceback=traceback.format_exc())
return False
def close(self) -> None:
self._sync_engine.dispose()

View File

@ -1,9 +1,10 @@
from datetime import datetime, timezone
from typing import Any
import aiosqlite
import structlog
from ..models import Article, ArticleCreate, ProcessingStatus
from ..models.article_dto import ArticleDTO, ArticleStatus
from .database import DatabaseService
logger = structlog.get_logger()
@ -15,19 +16,20 @@ class ArticleRepository:
self.db_service = db_service
self.logger = structlog.get_logger().bind(repository="article")
async def create_article(self, article_data: ArticleCreate) -> Article:
existing = await self.get_by_url(article_data.url)
async def create_article(self, url: str, title: str, raw_text: str) -> ArticleDTO:
existing = await self.get_by_url(url)
if existing:
raise ValueError(f"Статья с URL {article_data.url} уже существует")
raise ValueError(f"Статья с URL {url} уже существует")
article = Article(
url=article_data.url,
title=article_data.title,
raw_text=article_data.raw_text,
status=ProcessingStatus.PENDING,
article = ArticleDTO(
url=url,
title=title,
raw_text=raw_text,
status=ArticleStatus.PENDING,
created_at=datetime.now(timezone.utc),
)
async with self.db_service.get_connection() as conn:
async with await self.db_service.get_connection() as conn:
cursor = await conn.execute(
"""
INSERT INTO articles (url, title, raw_text, status, created_at)
@ -48,8 +50,8 @@ class ArticleRepository:
self.logger.info("Статья создана", article_id=article.id, url=article.url)
return article
async def get_by_id(self, article_id: int) -> Article | None:
async with self.db_service.get_connection() as conn:
async def get_by_id(self, article_id: int) -> ArticleDTO | None:
async with await self.db_service.get_connection() as conn:
cursor = await conn.execute(
"SELECT * FROM articles WHERE id = ?",
(article_id,),
@ -59,10 +61,10 @@ class ArticleRepository:
if not row:
return None
return self._row_to_article(row)
return self._row_to_article_dto(row)
async def get_by_url(self, url: str) -> Article | None:
async with self.db_service.get_connection() as conn:
async def get_by_url(self, url: str) -> ArticleDTO | None:
async with await self.db_service.get_connection() as conn:
cursor = await conn.execute(
"SELECT * FROM articles WHERE url = ?",
(url,),
@ -72,13 +74,15 @@ class ArticleRepository:
if not row:
return None
return self._row_to_article(row)
return self._row_to_article_dto(row)
async def update_article(self, article: Article) -> Article:
async def update_article(self, article: ArticleDTO) -> ArticleDTO:
if not article.id:
raise ValueError("ID статьи не может быть None для обновления")
async with self.db_service.get_connection() as conn:
article.updated_at = datetime.now(timezone.utc)
async with await self.db_service.get_connection() as conn:
cursor = await conn.execute(
"""
UPDATE articles SET
@ -86,10 +90,6 @@ class ArticleRepository:
raw_text = ?,
simplified_text = ?,
status = ?,
error_message = ?,
token_count_raw = ?,
token_count_simplified = ?,
processing_time_seconds = ?,
updated_at = ?
WHERE id = ?
""",
@ -98,10 +98,6 @@ class ArticleRepository:
article.raw_text,
article.simplified_text,
article.status.value,
article.error_message,
article.token_count_raw,
article.token_count_simplified,
article.processing_time_seconds,
article.updated_at,
article.id,
),
@ -115,8 +111,8 @@ class ArticleRepository:
return article
async def get_articles_by_status(
self, status: ProcessingStatus, limit: int | None = None
) -> list[Article]:
self, status: ArticleStatus, limit: int | None = None
) -> list[ArticleDTO]:
query = "SELECT * FROM articles WHERE status = ?"
params: tuple[Any, ...] = (status.value,)
@ -124,17 +120,17 @@ class ArticleRepository:
query += " LIMIT ?"
params = params + (limit,)
async with self.db_service.get_connection() as conn:
async with await self.db_service.get_connection() as conn:
cursor = await conn.execute(query, params)
rows = await cursor.fetchall()
return [self._row_to_article(row) for row in rows]
return [self._row_to_article_dto(row) for row in rows]
async def get_pending_articles(self, limit: int | None = None) -> list[Article]:
return await self.get_articles_by_status(ProcessingStatus.PENDING, limit)
async def get_pending_articles(self, limit: int | None = None) -> list[ArticleDTO]:
return await self.get_articles_by_status(ArticleStatus.PENDING, limit)
async def count_by_status(self, status: ProcessingStatus) -> int:
async with self.db_service.get_connection() as conn:
async def count_by_status(self, status: ArticleStatus) -> int:
async with await self.db_service.get_connection() as conn:
cursor = await conn.execute(
"SELECT COUNT(*) FROM articles WHERE status = ?",
(status.value,),
@ -143,7 +139,7 @@ class ArticleRepository:
return result[0] if result else 0
async def get_all_articles(self, limit: int | None = None, offset: int = 0) -> list[Article]:
async def get_all_articles(self, limit: int | None = None, offset: int = 0) -> list[ArticleDTO]:
query = "SELECT * FROM articles ORDER BY created_at DESC"
params: tuple[Any, ...] = ()
@ -151,14 +147,14 @@ class ArticleRepository:
query += " LIMIT ? OFFSET ?"
params = (limit, offset)
async with self.db_service.get_connection() as conn:
async with await self.db_service.get_connection() as conn:
cursor = await conn.execute(query, params)
rows = await cursor.fetchall()
return [self._row_to_article(row) for row in rows]
return [self._row_to_article_dto(row) for row in rows]
async def delete_article(self, article_id: int) -> bool:
async with self.db_service.get_connection() as conn:
async with await self.db_service.get_connection() as conn:
cursor = await conn.execute(
"DELETE FROM articles WHERE id = ?",
(article_id,),
@ -171,18 +167,14 @@ class ArticleRepository:
return deleted
def _row_to_article(self, row: aiosqlite.Row) -> Article:
return Article(
def _row_to_article_dto(self, row: aiosqlite.Row) -> ArticleDTO:
return ArticleDTO(
id=row["id"],
url=row["url"],
title=row["title"],
raw_text=row["raw_text"],
simplified_text=row["simplified_text"],
status=ProcessingStatus(row["status"]),
error_message=row["error_message"],
token_count_raw=row["token_count_raw"],
token_count_simplified=row["token_count_simplified"],
processing_time_seconds=row["processing_time_seconds"],
created_at=row["created_at"],
updated_at=row["updated_at"],
status=ArticleStatus(row["status"]),
created_at=datetime.fromisoformat(row["created_at"]) if row["created_at"] else None,
updated_at=datetime.fromisoformat(row["updated_at"]) if row["updated_at"] else None,
)

View File

@ -8,7 +8,7 @@ import structlog
from src.adapters.llm import LLMProviderAdapter, LLMTokenLimitError
from src.adapters.ruwiki import RuWikiAdapter
from src.models import AppConfig, ArticleCreate, ProcessingResult, SimplifyCommand
from src.models import AppConfig, ProcessingResult, SimplifyCommand
from src.models.constants import LLM_MAX_INPUT_TOKENS, MAX_TOKEN_LIMIT_WITH_BUFFER
from src.services.repository import ArticleRepository
from src.services.text_splitter import RecursiveCharacterTextSplitter
@ -70,7 +70,9 @@ class SimplifyService:
page_info = await self.ruwiki_adapter.fetch_page_cleaned(command.url)
article = await self._create_or_update_article(command, page_info)
article.mark_processing()
# Отмечаем статью как обрабатываемую
from src.models.article_dto import ArticleStatus
article.status = ArticleStatus.PENDING # В новой схеме используем PENDING для обработки
await self.repository.update_article(article)
simplified_text, input_tokens, output_tokens = await self._simplify_article_text(
@ -111,21 +113,19 @@ class SimplifyService:
title=existing_article.title,
raw_text=existing_article.raw_text,
simplified_text=existing_article.simplified_text,
token_count_raw=existing_article.token_count_raw or 0,
token_count_simplified=existing_article.token_count_simplified or 0,
processing_time_seconds=existing_article.processing_time_seconds or 0,
token_count_raw=0, # ArticleDTO не хранит token counts
token_count_simplified=0, # ArticleDTO не хранит token counts
processing_time_seconds=0, # ArticleDTO не хранит processing time
)
return None
async def _create_or_update_article(self, command, page_info):
article_data = ArticleCreate(
url=command.url,
title=page_info.title,
raw_text=page_info.content,
)
try:
return await self.repository.create_article(article_data)
return await self.repository.create_article(
url=command.url,
title=page_info.title,
raw_text=page_info.content,
)
except ValueError:
article = await self.repository.get_by_url(command.url)
if not article:
@ -133,9 +133,10 @@ class SimplifyService:
raise ValueError(msg) from None
if command.force_reprocess:
from src.models.article_dto import ArticleStatus
article.title = page_info.title
article.raw_text = page_info.content
article.mark_processing()
article.status = ArticleStatus.PENDING # Эквивалент mark_processing
await self.repository.update_article(article)
return article

View File

@ -5,7 +5,8 @@ from dataclasses import dataclass, field
import structlog
from src.models import Article, ProcessingResult
from src.models import ProcessingResult
from src.models.article_dto import ArticleDTO
from src.models.constants import WRITE_QUEUE_BATCH_SIZE
from src.services.repository import ArticleRepository
@ -13,9 +14,9 @@ from src.services.repository import ArticleRepository
@dataclass
class WriteOperation:
operation_type: str
article: Article | None = None
article: ArticleDTO | None = None
result: ProcessingResult | None = None
future: asyncio.Future[Article] | None = field(default=None, init=False)
future: asyncio.Future[ArticleDTO] | None = field(default=None, init=False)
class AsyncWriteQueue:
@ -56,15 +57,15 @@ class AsyncWriteQueue:
self.logger.info("Write queue остановлена")
async def update_article(self, article: Article) -> None:
async def update_article(self, article: ArticleDTO) -> None:
operation = WriteOperation(
operation_type="update",
article=article,
)
await self._queue.put(operation)
async def update_from_result(self, result: ProcessingResult) -> Article:
future: asyncio.Future[Article] = asyncio.Future()
async def update_from_result(self, result: ProcessingResult) -> ArticleDTO:
future: asyncio.Future[ArticleDTO] = asyncio.Future()
operation = WriteOperation(
operation_type="update_from_result",
@ -143,7 +144,7 @@ class AsyncWriteQueue:
msg = f"Неизвестный тип операции: {operation.operation_type}"
raise ValueError(msg)
async def _update_article_from_result(self, result: ProcessingResult) -> Article:
async def _update_article_from_result(self, result: ProcessingResult) -> ArticleDTO:
article = await self.repository.get_by_url(result.url)
if not article:
msg = f"Статья с URL {result.url} не найдена"
@ -154,14 +155,13 @@ class AsyncWriteQueue:
msg = "Неполные данные в успешном результате"
raise ValueError(msg)
article.mark_completed(
simplified_text=result.simplified_text,
token_count_raw=result.token_count_raw or 0,
token_count_simplified=result.token_count_simplified or 0,
processing_time=result.processing_time_seconds or 0,
)
# Обновляем поля напрямую, так как у ArticleDTO нет методов mark_*
from src.models.article_dto import ArticleStatus
article.simplified_text = result.simplified_text
article.status = ArticleStatus.SIMPLIFIED
else:
article.mark_failed(result.error_message or "Неизвестная ошибка")
from src.models.article_dto import ArticleStatus
article.status = ArticleStatus.FAILED
return await self.repository.update_article(article)

View File

@ -13,7 +13,6 @@ from src.models import AppConfig, Article, ArticleCreate, ProcessingStatus
@pytest.fixture(scope="session")
def event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]:
"""Создать event loop для всей сессии тестов."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@ -21,7 +20,6 @@ def event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]:
@pytest.fixture
def test_config() -> AppConfig:
"""Тестовая конфигурация."""
with tempfile.TemporaryDirectory() as temp_dir:
db_path = Path(temp_dir) / "test.db"
return AppConfig(
@ -38,7 +36,6 @@ def test_config() -> AppConfig:
@pytest.fixture
def sample_wiki_urls() -> list[str]:
"""Список тестовых URL википедии."""
return [
"https://ru.wikipedia.org/wiki/Тест",
"https://ru.wikipedia.org/wiki/Пример",
@ -48,7 +45,6 @@ def sample_wiki_urls() -> list[str]:
@pytest.fixture
def invalid_urls() -> list[str]:
"""Список невалидных URL."""
return [
"https://example.com/invalid",
"https://en.wikipedia.org/wiki/English",

View File

@ -108,7 +108,7 @@ class TestDatabaseService:
db_service = DatabaseService(test_config)
await db_service.initialize_database()
async with db_service.get_connection() as conn:
async with await db_service.get_connection() as conn:
cursor = await conn.execute("SELECT 1")
result = await cursor.fetchone()
assert result[0] == 1