import asyncio import signal import time import structlog from .models import AppConfig, ProcessingStats, SimplifyCommand from .services import SimplifyService from .sources import FileSource logger = structlog.get_logger() class AsyncRunner: def __init__( self, config: AppConfig, simplify_service: SimplifyService, max_workers: int = 10, ) -> None: self.config = config self.simplify_service = simplify_service self.max_workers = max_workers self._task_queue: asyncio.Queue[SimplifyCommand] = asyncio.Queue() self._workers: list[asyncio.Task[None]] = [] self._shutdown_event = asyncio.Event() self.stats = ProcessingStats() self._start_time: float | None = None self.logger = structlog.get_logger().bind(service="runner") async def run_from_file( self, input_file: str, force_reprocess: bool = False, max_articles: int | None = None, ) -> ProcessingStats: self.logger.info( "Запуск обработки статей из файла", input_file=input_file, force_reprocess=force_reprocess, max_workers=self.max_workers, max_articles=max_articles, ) self._setup_signal_handlers() try: source = FileSource(input_file) await self._load_tasks_from_source(source, force_reprocess, max_articles) await self._run_processing() except Exception as e: self.logger.error("Ошибка при выполнении runner", error=str(e)) raise finally: await self._cleanup() return self.stats async def _load_tasks_from_source( self, source: FileSource, force_reprocess: bool, max_articles: int | None, ) -> None: loaded_count = 0 async for command in source.read_urls(force_reprocess=force_reprocess): if max_articles and loaded_count >= max_articles: break await self._task_queue.put(command) loaded_count += 1 self.logger.info("Задачи загружены в очередь", count=loaded_count) async def _run_processing(self) -> None: self._start_time = time.time() self.logger.info("Запуск worker корутин", count=self.max_workers) for i in range(self.max_workers): worker = asyncio.create_task(self._worker_loop(worker_id=i)) self._workers.append(worker) await self._task_queue.join() self._shutdown_event.set() if self._workers: await asyncio.gather(*self._workers, return_exceptions=True) async def _worker_loop(self, worker_id: int) -> None: worker_logger = self.logger.bind(worker_id=worker_id) worker_logger.info("Worker запущен") processed_count = 0 while not self._shutdown_event.is_set(): try: try: command = await asyncio.wait_for( self._task_queue.get(), timeout=1.0, ) except asyncio.TimeoutError: continue try: result = await self.simplify_service.process_command(command) self.stats.add_result(result) processed_count += 1 if result.success: worker_logger.info( "Статья обработана успешно", url=command.url, title=result.title, tokens_in=result.token_count_raw, tokens_out=result.token_count_simplified, ) else: worker_logger.warning( "Ошибка при обработке статьи", url=command.url, error=result.error_message, ) except Exception as e: worker_logger.error( "Неожиданная ошибка в worker", url=command.url, error=str(e), ) from .models import ProcessingResult error_result = ProcessingResult.failure_result( command.url, f"Неожиданная ошибка: {e!s}", ) self.stats.add_result(error_result) finally: self._task_queue.task_done() except Exception as e: worker_logger.error("Критическая ошибка в worker loop", error=str(e)) break worker_logger.info("Worker завершён", processed_articles=processed_count) def _setup_signal_handlers(self) -> None: def signal_handler(signum: int, frame: None) -> None: signal_name = signal.Signals(signum).name self.logger.info(f"Получен сигнал {signal_name}, начинаем graceful shutdown") self._shutdown_event.set() try: signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) except ValueError: self.logger.warning("Не удалось настроить обработчики сигналов") async def _cleanup(self) -> None: self.logger.info("Начинаем очистку ресурсов") for worker in self._workers: if not worker.done(): worker.cancel() if self._workers: results = await asyncio.gather(*self._workers, return_exceptions=True) cancelled_count = sum(1 for r in results if isinstance(r, asyncio.CancelledError)) if cancelled_count > 0: self.logger.info("Workers отменены", count=cancelled_count) self._workers.clear() def get_progress_info(self) -> dict[str, any]: elapsed_time = time.time() - self._start_time if self._start_time else 0 articles_per_minute = 0 if elapsed_time > 0: articles_per_minute = (self.stats.successful * 60) / elapsed_time return { "total_processed": self.stats.total_processed, "successful": self.stats.successful, "failed": self.stats.failed, "success_rate": self.stats.success_rate, "elapsed_time": elapsed_time, "articles_per_minute": articles_per_minute, "queue_size": self._task_queue.qsize(), "active_workers": len([w for w in self._workers if not w.done()]), } async def health_check(self) -> dict[str, any]: checks = await self.simplify_service.health_check() checks.update( { "runner_active": bool(self._workers and not self._shutdown_event.is_set()), "queue_size": self._task_queue.qsize(), "workers_count": len(self._workers), } ) return checks