import json import redis.asyncio as redis from typing import List, Optional from models import TextTask class RedisClient: def __init__(self, host: str, port: int, task_channel: str, result_channel: str, text_task_channel: str = "text_task_channel"): self.client = redis.Redis(host=host, port=port, decode_responses=True) self.task_channel = task_channel self.result_channel = result_channel self.text_task_channel = text_task_channel async def get_task(self, timeout=0): """Получает задачу из очереди с помощью BLPOP""" result = await self.client.blpop(self.task_channel, timeout=timeout) if result: _, task_json = result try: return json.loads(task_json) except Exception as e: print(f"Error parsing task message: {e}") return None async def get_tasks_batch(self, batch_size: int, timeout: int = 1) -> List[dict]: """Получает батч задач из очереди""" tasks = [] result = await self.client.blpop(self.task_channel, timeout=timeout) if result: _, task_json = result try: task = json.loads(task_json) tasks.append(task) except Exception as e: print(f"Error parsing task message: {e}") if tasks: for _ in range(batch_size - 1): task_json = await self.client.lpop(self.task_channel) if not task_json: break try: task = json.loads(task_json) tasks.append(task) except Exception as e: print(f"Error parsing task message: {e}") return tasks async def publish_result(self, result: dict): """Отправляет результат в очередь результатов""" await self.client.rpush(self.result_channel, json.dumps(result)) async def send_to_summarize(self, task_data: dict): """Отправляет текст в сервис суммаризации""" await self.client.rpush(self.text_task_channel, json.dumps(task_data)) async def send_texts_batch(self, tasks: List[TextTask]): """Отправляет батч текстов в сервис суммаризации""" pipeline = self.client.pipeline() for task in tasks: task_data = { "chat_id": task.chat_id, "user_id": task.user_id, "message_id": task.message_id, "text": task.text } pipeline.rpush(self.text_task_channel, json.dumps(task_data)) await pipeline.execute()