44 lines
1.9 KiB
Python
44 lines
1.9 KiB
Python
import json
|
||
import redis.asyncio as redis
|
||
import asyncio
|
||
|
||
class RedisService:
|
||
def __init__(self, host: str, port: int):
|
||
self.client = redis.Redis(host=host, port=port, decode_responses=True)
|
||
self.task_channel = "audio_tasks"
|
||
self.result_channel = "text_result_channel"
|
||
|
||
async def publish_task(self, task_data: dict):
|
||
"""Отправляет задачу в очередь аудио задач"""
|
||
await self.client.rpush(self.task_channel, json.dumps(task_data))
|
||
|
||
async def wait_for_text(self, user_id: int, chat_id: int, message_id: int, timeout: int = 180):
|
||
"""Ожидает результат обработки текста с помощью блокирующего ожидания"""
|
||
start_time = asyncio.get_event_loop().time()
|
||
|
||
while asyncio.get_event_loop().time() - start_time < timeout:
|
||
try:
|
||
result = await self.client.blpop(self.result_channel, timeout=1)
|
||
if not result:
|
||
continue
|
||
|
||
_, data_json = result
|
||
try:
|
||
data = json.loads(data_json)
|
||
#print(f"Получен результат: {data}")
|
||
except Exception as e:
|
||
print(f"Ошибка при разборе JSON: {e}")
|
||
continue
|
||
|
||
if (data.get("user_id") == user_id and
|
||
data.get("chat_id") == chat_id and
|
||
data.get("message_id") == message_id):
|
||
return data.get("text")
|
||
else:
|
||
await self.client.rpush(self.result_channel, data_json)
|
||
except Exception as e:
|
||
print(f"Ошибка при ожидании результата: {e}")
|
||
await asyncio.sleep(0.5)
|
||
|
||
return None
|