122 lines
4.3 KiB
Python
122 lines
4.3 KiB
Python
from __future__ import annotations
|
||
|
||
import asyncio
|
||
from pathlib import Path
|
||
from typing import AsyncGenerator
|
||
from urllib.parse import urlparse
|
||
|
||
import structlog
|
||
|
||
from src.models import SimplifyCommand
|
||
from src.models.constants import ARTICLE_NAME_INDEX, MIN_WIKI_PATH_PARTS, WIKI_PATH_INDEX
|
||
|
||
|
||
class FileSource:
|
||
def __init__(self, file_path: str) -> None:
|
||
self.file_path = Path(file_path)
|
||
self.logger = structlog.get_logger().bind(source="file", path=str(self.file_path))
|
||
|
||
async def read_urls(
|
||
self, *, force_reprocess: bool = False
|
||
) -> AsyncGenerator[SimplifyCommand, None]:
|
||
if not self.file_path.exists():
|
||
msg = f"Файл с URL не найден: {self.file_path}"
|
||
raise FileNotFoundError(msg)
|
||
|
||
self.logger.info("Начинаем чтение URL из файла")
|
||
|
||
content = await asyncio.to_thread(self._read_file_sync)
|
||
|
||
seen_urls = set()
|
||
valid_count = 0
|
||
invalid_count = 0
|
||
|
||
for line_num, original_line in enumerate(content.splitlines(), 1):
|
||
line = original_line.strip()
|
||
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
|
||
if not self._is_valid_wikipedia_url(line):
|
||
self.logger.warning("Невалидный URL", line_number=line_num, url=line)
|
||
invalid_count += 1
|
||
continue
|
||
|
||
if line in seen_urls:
|
||
self.logger.debug("Дубликат URL пропущен", line_number=line_num, url=line)
|
||
continue
|
||
|
||
seen_urls.add(line)
|
||
valid_count += 1
|
||
|
||
yield SimplifyCommand(url=line, force_reprocess=force_reprocess)
|
||
|
||
self.logger.info(
|
||
"Завершено чтение URL",
|
||
valid_count=valid_count,
|
||
invalid_count=invalid_count,
|
||
total_unique=len(seen_urls),
|
||
)
|
||
|
||
def _read_file_sync(self) -> str:
|
||
return self.file_path.read_text(encoding="utf-8")
|
||
|
||
def _is_valid_wikipedia_url(self, url: str) -> bool:
|
||
try:
|
||
self.logger.info("Начинаем проверку URL", raw_url=url)
|
||
|
||
parsed = urlparse(url)
|
||
self.logger.info(
|
||
"Разобранный URL", scheme=parsed.scheme, netloc=parsed.netloc, path=parsed.path
|
||
)
|
||
|
||
if parsed.scheme not in ("http", "https"):
|
||
self.logger.info("Отклонено: неподдерживаемая схема", scheme=parsed.scheme, url=url)
|
||
return False
|
||
|
||
if "ruwiki" not in parsed.netloc:
|
||
self.logger.info(
|
||
"Отклонено: домен не содержит 'ruwiki'", netloc=parsed.netloc, url=url
|
||
)
|
||
return False
|
||
|
||
path_parts = parsed.path.split("/")
|
||
self.logger.info("Части пути", path_parts=path_parts)
|
||
|
||
if len(path_parts) < MIN_WIKI_PATH_PARTS:
|
||
self.logger.info(
|
||
"Отклонено: слишком мало сегментов в пути", parts=path_parts, url=url
|
||
)
|
||
return False
|
||
|
||
if path_parts[WIKI_PATH_INDEX] != "wiki":
|
||
self.logger.info(
|
||
"Отклонено: неверный сегмент пути",
|
||
expected="wiki",
|
||
actual=path_parts[WIKI_PATH_INDEX],
|
||
url=url,
|
||
)
|
||
return False
|
||
|
||
article_name = path_parts[ARTICLE_NAME_INDEX]
|
||
self.logger.info("Извлечено имя статьи", article_name=article_name, url=url)
|
||
|
||
if not article_name or article_name in ("Main_Page", "Заглавная_страница"):
|
||
self.logger.info(
|
||
"Отклонено: некорректное имя статьи", article_name=article_name, url=url
|
||
)
|
||
return False
|
||
|
||
self.logger.info("URL прошёл все проверки", url=url)
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.info("Ошибка при проверке URL", error=str(e), url=url)
|
||
return False
|
||
|
||
async def count_urls(self) -> int:
|
||
count = 0
|
||
async for _ in self.read_urls():
|
||
count += 1
|
||
return count
|