# app/redis_client.py import os import json import time import redis from typing import List from pydantic import BaseModel class Task(BaseModel): chat_id: int user_id: int message_id: int text: str class RedisClient: def __init__(self, host: str, port: int, task_channel: str, result_channel: str): self.host = host self.port = port self.task_channel = task_channel self.result_channel = result_channel self.client = redis.Redis(host=self.host, port=self.port, decode_responses=True) def get_tasks(self, batch_size: int, wait_timeout: int = 5) -> List[Task]: tasks = [] res = self.client.blpop(self.task_channel, timeout=wait_timeout) if res: _, task_json = res try: task = Task.parse_raw(task_json) tasks.append(task) except Exception as e: print("Ошибка парсинга задачи:", e) while len(tasks) < batch_size: task_json = self.client.lpop(self.task_channel) if task_json is None: break try: task = Task.parse_raw(task_json) tasks.append(task) except Exception as e: print("Ошибка парсинга задачи:", e) return tasks def publish_result(self, result: dict): result_json = json.dumps(result) self.client.rpush(self.result_channel, result_json)