From 50439b84bb77b70efa93a35c740158c7a8a4302a Mon Sep 17 00:00:00 2001 From: itqop Date: Fri, 11 Jul 2025 23:55:49 +0300 Subject: [PATCH] Text DDL --- requirements.txt | 2 +- src/cli.py | 19 +++---- src/dependency_injection.py | 6 +-- src/models/__init__.py | 9 ++-- src/models/article.py | 81 ------------------------------ src/models/article_dto.py | 26 ++++++++++ src/models/constants.py | 6 +++ src/services/database.py | 56 +++++++++++++++------ src/services/repository.py | 86 +++++++++++++++----------------- src/services/simplify_service.py | 27 +++++----- src/services/write_queue.py | 28 +++++------ tests/conftest.py | 4 -- tests/test_services.py | 2 +- 13 files changed, 152 insertions(+), 200 deletions(-) delete mode 100644 src/models/article.py create mode 100644 src/models/article_dto.py diff --git a/requirements.txt b/requirements.txt index 01e9b89..6c1e46f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ anyio>=4.2.0,<5.0.0 aiohttp>=3.9.0,<4.0.0 aiosqlite>=0.19.0,<0.20.0 -sqlmodel>=0.0.14,<0.0.15 +sqlalchemy[asyncio]>=2.0.0,<3.0.0 openai>=1.13.0,<2.0.0 tiktoken>=0.5.2,<0.6.0 diff --git a/src/cli.py b/src/cli.py index 0f857fd..b1531cf 100644 --- a/src/cli.py +++ b/src/cli.py @@ -223,9 +223,9 @@ def list_articles( repository = container.get_repository() if status: - from .models import ProcessingStatus + from .models.article_dto import ArticleStatus - status_enum = ProcessingStatus(status) + status_enum = ArticleStatus(status) articles = await repository.get_articles_by_status(status_enum, limit) else: articles = await repository.get_all_articles(limit) @@ -238,8 +238,7 @@ def list_articles( "title": article.title, "status": article.status.value, "created_at": article.created_at.isoformat(), - "token_count_raw": article.token_count_raw, - "token_count_simplified": article.token_count_simplified, + "simplified_text": article.simplified_text, } for article in articles ] @@ -249,20 +248,14 @@ def list_articles( click.echo("Статьи не найдены") return - click.echo(f"{'ID':<5} {'Статус':<12} {'Название':<50} {'Токены (исх/упр)':<15}") - click.echo("-" * 87) + click.echo(f"{'ID':<5} {'Статус':<12} {'Название':<50}") + click.echo("-" * 72) for article in articles: - tokens_info = "" - if article.token_count_raw and article.token_count_simplified: - tokens_info = f"{article.token_count_raw}/{article.token_count_simplified}" - elif article.token_count_raw: - tokens_info = f"{article.token_count_raw}/-" - title = article.title[:47] + "..." if len(article.title) > 50 else article.title click.echo( - f"{article.id:<5} {article.status.value:<12} {title:<50} {tokens_info:<15}" + f"{article.id:<5} {article.status.value:<12} {title:<50}" ) except Exception as e: diff --git a/src/dependency_injection.py b/src/dependency_injection.py index 30e8412..3a3125c 100644 --- a/src/dependency_injection.py +++ b/src/dependency_injection.py @@ -45,9 +45,6 @@ class DependencyContainer: if self._write_queue: await self._write_queue.stop() - if self._database_service: - self._database_service.close() - self._initialized = False logger.info("Ресурсы очищены") @@ -117,7 +114,7 @@ class DependencyContainer: checks["database"] = await db_service.health_check() except Exception: checks["database"] = False - + return checks try: write_queue = self.get_write_queue() checks["write_queue"] = ( @@ -141,7 +138,6 @@ class DependencyContainer: return checks -@lru_cache(maxsize=1) def get_container(config: AppConfig | None = None) -> DependencyContainer: if config is None: config = AppConfig() diff --git a/src/models/__init__.py b/src/models/__init__.py index 1974d53..c1818fb 100644 --- a/src/models/__init__.py +++ b/src/models/__init__.py @@ -1,15 +1,12 @@ -from .article import Article, ArticleCreate, ArticleRead, ProcessingStatus +from .article_dto import ArticleDTO, ArticleStatus from .commands import ProcessingResult, ProcessingStats, SimplifyCommand from .config import AppConfig -from .constants import * __all__ = [ "AppConfig", - "Article", - "ArticleCreate", - "ArticleRead", + "ArticleDTO", + "ArticleStatus", "ProcessingResult", "ProcessingStats", - "ProcessingStatus", "SimplifyCommand", ] diff --git a/src/models/article.py b/src/models/article.py deleted file mode 100644 index c6aed9c..0000000 --- a/src/models/article.py +++ /dev/null @@ -1,81 +0,0 @@ -from __future__ import annotations - -from datetime import datetime, timezone -from enum import Enum - -from sqlmodel import Field, SQLModel - - -class ProcessingStatus(str, Enum): - PENDING = "pending" - PROCESSING = "processing" - COMPLETED = "completed" - FAILED = "failed" - - -class Article(SQLModel, table=True): - - __tablename__ = "articles" - - id: int | None = Field(default=None, primary_key=True) - url: str = Field(index=True, unique=True, max_length=500) - title: str = Field(max_length=300) - raw_text: str = Field(description="Исходный wiki-текст") - simplified_text: str | None = Field( - default=None, - description="Упрощённый текст для школьников", - ) - status: ProcessingStatus = Field(default=ProcessingStatus.PENDING) - error_message: str | None = Field(default=None, max_length=1000) - token_count_raw: int | None = Field( - default=None, description="Количество токенов в исходном тексте" - ) - token_count_simplified: int | None = Field( - default=None, - description="Количество токенов в упрощённом тексте", - ) - processing_time_seconds: float | None = Field(default=None) - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - updated_at: datetime | None = Field(default=None) - - def mark_processing(self) -> None: - self.status = ProcessingStatus.PROCESSING - self.updated_at = datetime.now(timezone.utc) - - def mark_completed( - self, - simplified_text: str, - token_count_raw: int, - token_count_simplified: int, - processing_time: float, - ) -> None: - self.simplified_text = simplified_text - self.token_count_raw = token_count_raw - self.token_count_simplified = token_count_simplified - self.processing_time_seconds = processing_time - self.status = ProcessingStatus.COMPLETED - self.error_message = None - self.updated_at = datetime.now(timezone.utc) - - def mark_failed(self, error_message: str) -> None: - self.status = ProcessingStatus.FAILED - self.error_message = error_message[:1000] - self.updated_at = datetime.now(timezone.utc) - - -class ArticleCreate(SQLModel): - url: str - title: str - raw_text: str - - -class ArticleRead(SQLModel): - id: int - url: str - title: str - raw_text: str - simplified_text: str | None - status: ProcessingStatus - token_count_raw: int | None - token_count_simplified: int | None - created_at: datetime diff --git a/src/models/article_dto.py b/src/models/article_dto.py new file mode 100644 index 0000000..9f837d1 --- /dev/null +++ b/src/models/article_dto.py @@ -0,0 +1,26 @@ +from datetime import datetime +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +class ArticleStatus(Enum): + PENDING = "pending" + SIMPLIFIED = "simplified" + FAILED = "failed" + + +@dataclass +class ArticleDTO: + """ + DTO для Article без зависимостей от SQLModel. + Используется в runtime коде для передачи данных между слоями. + """ + url: str + title: str + raw_text: str + status: ArticleStatus + created_at: datetime + id: Optional[int] = None + simplified_text: Optional[str] = None + updated_at: Optional[datetime] = None \ No newline at end of file diff --git a/src/models/constants.py b/src/models/constants.py index e69de29..2022d74 100644 --- a/src/models/constants.py +++ b/src/models/constants.py @@ -0,0 +1,6 @@ +LLM_MAX_INPUT_TOKENS = 4096 +MAX_TOKEN_LIMIT_WITH_BUFFER = 3800 +ARTICLE_NAME_INDEX = 0 +MIN_WIKI_PATH_PARTS = 2 +WIKI_PATH_INDEX = 1 +WRITE_QUEUE_BATCH_SIZE = 10 diff --git a/src/services/database.py b/src/services/database.py index 406e2aa..3c10db4 100644 --- a/src/services/database.py +++ b/src/services/database.py @@ -1,10 +1,7 @@ -"""Сервис для управления базой данных.""" - from pathlib import Path import aiosqlite import structlog -from sqlmodel import SQLModel, create_engine from ..models import AppConfig @@ -16,18 +13,40 @@ class DatabaseService: self.config = config self.logger = structlog.get_logger().bind(service="database") - self._sync_engine = create_engine( - config.sync_db_url, - echo=False, - connect_args={"check_same_thread": False}, - ) - async def initialize_database(self) -> None: db_path = Path(self.config.db_path) db_path.parent.mkdir(parents=True, exist_ok=True) self.logger.info("Создание схемы базы данных", db_path=self.config.db_path) - SQLModel.metadata.create_all(self._sync_engine) + + # Создаём таблицы напрямую через DDL без SQLAlchemy + async with aiosqlite.connect(self.config.db_path) as conn: + await conn.execute(""" + CREATE TABLE IF NOT EXISTS articles ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL UNIQUE, + title TEXT NOT NULL, + raw_text TEXT NOT NULL, + simplified_text TEXT, + status TEXT NOT NULL DEFAULT 'pending', + error_message TEXT, + token_count_raw INTEGER, + token_count_simplified INTEGER, + processing_time_seconds REAL, + created_at TEXT NOT NULL, + updated_at TEXT + ) + """) + + await conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_articles_url ON articles(url) + """) + + await conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_articles_status ON articles(status) + """) + + await conn.commit() await self._configure_sqlite() @@ -47,6 +66,7 @@ class DatabaseService: self.logger.info("SQLite настроен для оптимальной производительности") async def get_connection(self) -> aiosqlite.Connection: + self.logger.info("Открытие соединения с базой данных") return await aiosqlite.connect( self.config.db_path, timeout=30.0, @@ -54,12 +74,18 @@ class DatabaseService: async def health_check(self) -> bool: try: - async with self.get_connection() as conn: - await conn.execute("SELECT 1") + self.logger.info("Начинаем health_check...") + self.logger.info("Создаём соединение напрямую...") + + # Создаём соединение напрямую, не переиспользуя объект + async with aiosqlite.connect(self.config.db_path, timeout=30.0) as connection: + self.logger.info("Вошли в async context manager") + self.logger.info("Выполняем SELECT 1...") + await connection.execute("SELECT 1") + self.logger.info("SELECT 1 выполнен успешно") return True except Exception as e: self.logger.error("Database health check failed", error=str(e)) + import traceback + self.logger.error("Traceback", traceback=traceback.format_exc()) return False - - def close(self) -> None: - self._sync_engine.dispose() diff --git a/src/services/repository.py b/src/services/repository.py index 4966087..5d1b31c 100644 --- a/src/services/repository.py +++ b/src/services/repository.py @@ -1,9 +1,10 @@ +from datetime import datetime, timezone from typing import Any import aiosqlite import structlog -from ..models import Article, ArticleCreate, ProcessingStatus +from ..models.article_dto import ArticleDTO, ArticleStatus from .database import DatabaseService logger = structlog.get_logger() @@ -15,19 +16,20 @@ class ArticleRepository: self.db_service = db_service self.logger = structlog.get_logger().bind(repository="article") - async def create_article(self, article_data: ArticleCreate) -> Article: - existing = await self.get_by_url(article_data.url) + async def create_article(self, url: str, title: str, raw_text: str) -> ArticleDTO: + existing = await self.get_by_url(url) if existing: - raise ValueError(f"Статья с URL {article_data.url} уже существует") + raise ValueError(f"Статья с URL {url} уже существует") - article = Article( - url=article_data.url, - title=article_data.title, - raw_text=article_data.raw_text, - status=ProcessingStatus.PENDING, + article = ArticleDTO( + url=url, + title=title, + raw_text=raw_text, + status=ArticleStatus.PENDING, + created_at=datetime.now(timezone.utc), ) - async with self.db_service.get_connection() as conn: + async with await self.db_service.get_connection() as conn: cursor = await conn.execute( """ INSERT INTO articles (url, title, raw_text, status, created_at) @@ -48,8 +50,8 @@ class ArticleRepository: self.logger.info("Статья создана", article_id=article.id, url=article.url) return article - async def get_by_id(self, article_id: int) -> Article | None: - async with self.db_service.get_connection() as conn: + async def get_by_id(self, article_id: int) -> ArticleDTO | None: + async with await self.db_service.get_connection() as conn: cursor = await conn.execute( "SELECT * FROM articles WHERE id = ?", (article_id,), @@ -59,10 +61,10 @@ class ArticleRepository: if not row: return None - return self._row_to_article(row) + return self._row_to_article_dto(row) - async def get_by_url(self, url: str) -> Article | None: - async with self.db_service.get_connection() as conn: + async def get_by_url(self, url: str) -> ArticleDTO | None: + async with await self.db_service.get_connection() as conn: cursor = await conn.execute( "SELECT * FROM articles WHERE url = ?", (url,), @@ -72,13 +74,15 @@ class ArticleRepository: if not row: return None - return self._row_to_article(row) + return self._row_to_article_dto(row) - async def update_article(self, article: Article) -> Article: + async def update_article(self, article: ArticleDTO) -> ArticleDTO: if not article.id: raise ValueError("ID статьи не может быть None для обновления") - async with self.db_service.get_connection() as conn: + article.updated_at = datetime.now(timezone.utc) + + async with await self.db_service.get_connection() as conn: cursor = await conn.execute( """ UPDATE articles SET @@ -86,10 +90,6 @@ class ArticleRepository: raw_text = ?, simplified_text = ?, status = ?, - error_message = ?, - token_count_raw = ?, - token_count_simplified = ?, - processing_time_seconds = ?, updated_at = ? WHERE id = ? """, @@ -98,10 +98,6 @@ class ArticleRepository: article.raw_text, article.simplified_text, article.status.value, - article.error_message, - article.token_count_raw, - article.token_count_simplified, - article.processing_time_seconds, article.updated_at, article.id, ), @@ -115,8 +111,8 @@ class ArticleRepository: return article async def get_articles_by_status( - self, status: ProcessingStatus, limit: int | None = None - ) -> list[Article]: + self, status: ArticleStatus, limit: int | None = None + ) -> list[ArticleDTO]: query = "SELECT * FROM articles WHERE status = ?" params: tuple[Any, ...] = (status.value,) @@ -124,17 +120,17 @@ class ArticleRepository: query += " LIMIT ?" params = params + (limit,) - async with self.db_service.get_connection() as conn: + async with await self.db_service.get_connection() as conn: cursor = await conn.execute(query, params) rows = await cursor.fetchall() - return [self._row_to_article(row) for row in rows] + return [self._row_to_article_dto(row) for row in rows] - async def get_pending_articles(self, limit: int | None = None) -> list[Article]: - return await self.get_articles_by_status(ProcessingStatus.PENDING, limit) + async def get_pending_articles(self, limit: int | None = None) -> list[ArticleDTO]: + return await self.get_articles_by_status(ArticleStatus.PENDING, limit) - async def count_by_status(self, status: ProcessingStatus) -> int: - async with self.db_service.get_connection() as conn: + async def count_by_status(self, status: ArticleStatus) -> int: + async with await self.db_service.get_connection() as conn: cursor = await conn.execute( "SELECT COUNT(*) FROM articles WHERE status = ?", (status.value,), @@ -143,7 +139,7 @@ class ArticleRepository: return result[0] if result else 0 - async def get_all_articles(self, limit: int | None = None, offset: int = 0) -> list[Article]: + async def get_all_articles(self, limit: int | None = None, offset: int = 0) -> list[ArticleDTO]: query = "SELECT * FROM articles ORDER BY created_at DESC" params: tuple[Any, ...] = () @@ -151,14 +147,14 @@ class ArticleRepository: query += " LIMIT ? OFFSET ?" params = (limit, offset) - async with self.db_service.get_connection() as conn: + async with await self.db_service.get_connection() as conn: cursor = await conn.execute(query, params) rows = await cursor.fetchall() - return [self._row_to_article(row) for row in rows] + return [self._row_to_article_dto(row) for row in rows] async def delete_article(self, article_id: int) -> bool: - async with self.db_service.get_connection() as conn: + async with await self.db_service.get_connection() as conn: cursor = await conn.execute( "DELETE FROM articles WHERE id = ?", (article_id,), @@ -171,18 +167,14 @@ class ArticleRepository: return deleted - def _row_to_article(self, row: aiosqlite.Row) -> Article: - return Article( + def _row_to_article_dto(self, row: aiosqlite.Row) -> ArticleDTO: + return ArticleDTO( id=row["id"], url=row["url"], title=row["title"], raw_text=row["raw_text"], simplified_text=row["simplified_text"], - status=ProcessingStatus(row["status"]), - error_message=row["error_message"], - token_count_raw=row["token_count_raw"], - token_count_simplified=row["token_count_simplified"], - processing_time_seconds=row["processing_time_seconds"], - created_at=row["created_at"], - updated_at=row["updated_at"], + status=ArticleStatus(row["status"]), + created_at=datetime.fromisoformat(row["created_at"]) if row["created_at"] else None, + updated_at=datetime.fromisoformat(row["updated_at"]) if row["updated_at"] else None, ) diff --git a/src/services/simplify_service.py b/src/services/simplify_service.py index 703cac2..370f32e 100644 --- a/src/services/simplify_service.py +++ b/src/services/simplify_service.py @@ -8,7 +8,7 @@ import structlog from src.adapters.llm import LLMProviderAdapter, LLMTokenLimitError from src.adapters.ruwiki import RuWikiAdapter -from src.models import AppConfig, ArticleCreate, ProcessingResult, SimplifyCommand +from src.models import AppConfig, ProcessingResult, SimplifyCommand from src.models.constants import LLM_MAX_INPUT_TOKENS, MAX_TOKEN_LIMIT_WITH_BUFFER from src.services.repository import ArticleRepository from src.services.text_splitter import RecursiveCharacterTextSplitter @@ -70,7 +70,9 @@ class SimplifyService: page_info = await self.ruwiki_adapter.fetch_page_cleaned(command.url) article = await self._create_or_update_article(command, page_info) - article.mark_processing() + # Отмечаем статью как обрабатываемую + from src.models.article_dto import ArticleStatus + article.status = ArticleStatus.PENDING # В новой схеме используем PENDING для обработки await self.repository.update_article(article) simplified_text, input_tokens, output_tokens = await self._simplify_article_text( @@ -111,21 +113,19 @@ class SimplifyService: title=existing_article.title, raw_text=existing_article.raw_text, simplified_text=existing_article.simplified_text, - token_count_raw=existing_article.token_count_raw or 0, - token_count_simplified=existing_article.token_count_simplified or 0, - processing_time_seconds=existing_article.processing_time_seconds or 0, + token_count_raw=0, # ArticleDTO не хранит token counts + token_count_simplified=0, # ArticleDTO не хранит token counts + processing_time_seconds=0, # ArticleDTO не хранит processing time ) return None async def _create_or_update_article(self, command, page_info): - article_data = ArticleCreate( - url=command.url, - title=page_info.title, - raw_text=page_info.content, - ) - try: - return await self.repository.create_article(article_data) + return await self.repository.create_article( + url=command.url, + title=page_info.title, + raw_text=page_info.content, + ) except ValueError: article = await self.repository.get_by_url(command.url) if not article: @@ -133,9 +133,10 @@ class SimplifyService: raise ValueError(msg) from None if command.force_reprocess: + from src.models.article_dto import ArticleStatus article.title = page_info.title article.raw_text = page_info.content - article.mark_processing() + article.status = ArticleStatus.PENDING # Эквивалент mark_processing await self.repository.update_article(article) return article diff --git a/src/services/write_queue.py b/src/services/write_queue.py index 38e8bd1..361bf56 100644 --- a/src/services/write_queue.py +++ b/src/services/write_queue.py @@ -5,7 +5,8 @@ from dataclasses import dataclass, field import structlog -from src.models import Article, ProcessingResult +from src.models import ProcessingResult +from src.models.article_dto import ArticleDTO from src.models.constants import WRITE_QUEUE_BATCH_SIZE from src.services.repository import ArticleRepository @@ -13,9 +14,9 @@ from src.services.repository import ArticleRepository @dataclass class WriteOperation: operation_type: str - article: Article | None = None + article: ArticleDTO | None = None result: ProcessingResult | None = None - future: asyncio.Future[Article] | None = field(default=None, init=False) + future: asyncio.Future[ArticleDTO] | None = field(default=None, init=False) class AsyncWriteQueue: @@ -56,15 +57,15 @@ class AsyncWriteQueue: self.logger.info("Write queue остановлена") - async def update_article(self, article: Article) -> None: + async def update_article(self, article: ArticleDTO) -> None: operation = WriteOperation( operation_type="update", article=article, ) await self._queue.put(operation) - async def update_from_result(self, result: ProcessingResult) -> Article: - future: asyncio.Future[Article] = asyncio.Future() + async def update_from_result(self, result: ProcessingResult) -> ArticleDTO: + future: asyncio.Future[ArticleDTO] = asyncio.Future() operation = WriteOperation( operation_type="update_from_result", @@ -143,7 +144,7 @@ class AsyncWriteQueue: msg = f"Неизвестный тип операции: {operation.operation_type}" raise ValueError(msg) - async def _update_article_from_result(self, result: ProcessingResult) -> Article: + async def _update_article_from_result(self, result: ProcessingResult) -> ArticleDTO: article = await self.repository.get_by_url(result.url) if not article: msg = f"Статья с URL {result.url} не найдена" @@ -154,14 +155,13 @@ class AsyncWriteQueue: msg = "Неполные данные в успешном результате" raise ValueError(msg) - article.mark_completed( - simplified_text=result.simplified_text, - token_count_raw=result.token_count_raw or 0, - token_count_simplified=result.token_count_simplified or 0, - processing_time=result.processing_time_seconds or 0, - ) + # Обновляем поля напрямую, так как у ArticleDTO нет методов mark_* + from src.models.article_dto import ArticleStatus + article.simplified_text = result.simplified_text + article.status = ArticleStatus.SIMPLIFIED else: - article.mark_failed(result.error_message or "Неизвестная ошибка") + from src.models.article_dto import ArticleStatus + article.status = ArticleStatus.FAILED return await self.repository.update_article(article) diff --git a/tests/conftest.py b/tests/conftest.py index 1eb54ef..84be689 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,7 +13,6 @@ from src.models import AppConfig, Article, ArticleCreate, ProcessingStatus @pytest.fixture(scope="session") def event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]: - """Создать event loop для всей сессии тестов.""" loop = asyncio.new_event_loop() yield loop loop.close() @@ -21,7 +20,6 @@ def event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]: @pytest.fixture def test_config() -> AppConfig: - """Тестовая конфигурация.""" with tempfile.TemporaryDirectory() as temp_dir: db_path = Path(temp_dir) / "test.db" return AppConfig( @@ -38,7 +36,6 @@ def test_config() -> AppConfig: @pytest.fixture def sample_wiki_urls() -> list[str]: - """Список тестовых URL википедии.""" return [ "https://ru.wikipedia.org/wiki/Тест", "https://ru.wikipedia.org/wiki/Пример", @@ -48,7 +45,6 @@ def sample_wiki_urls() -> list[str]: @pytest.fixture def invalid_urls() -> list[str]: - """Список невалидных URL.""" return [ "https://example.com/invalid", "https://en.wikipedia.org/wiki/English", diff --git a/tests/test_services.py b/tests/test_services.py index b0e9c0d..30563a7 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -108,7 +108,7 @@ class TestDatabaseService: db_service = DatabaseService(test_config) await db_service.initialize_database() - async with db_service.get_connection() as conn: + async with await db_service.get_connection() as conn: cursor = await conn.execute("SELECT 1") result = await cursor.fetchone() assert result[0] == 1