qopscribe/telegram_bot/services/redis_service.py

45 lines
2.0 KiB
Python
Raw Normal View History

2025-02-23 14:12:10 +01:00
import json
2025-02-23 15:24:56 +01:00
import redis.asyncio as redis
2025-03-03 02:09:17 +01:00
import asyncio
2025-02-23 14:12:10 +01:00
class RedisService:
def __init__(self, host: str, port: int):
self.client = redis.Redis(host=host, port=port, decode_responses=True)
2025-03-03 02:09:17 +01:00
self.task_channel = "audio_tasks"
self.result_channel = "text_result_channel"
2025-02-23 14:12:10 +01:00
2025-02-23 15:24:56 +01:00
async def publish_task(self, task_data: dict):
2025-03-03 02:09:17 +01:00
"""Отправляет задачу в очередь аудио задач"""
await self.client.rpush(self.task_channel, json.dumps(task_data))
2025-02-23 15:24:56 +01:00
2025-03-03 03:30:57 +01:00
async def wait_for_text(self, user_id: int, chat_id: int, message_id: int, timeout: int = 180):
2025-03-03 02:09:17 +01:00
"""Ожидает результат обработки текста с помощью блокирующего ожидания"""
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time < timeout:
try:
2025-03-03 02:46:27 +01:00
result = await self.client.blpop(self.result_channel, timeout=1)
if not result:
continue
_, data_json = result
try:
data = json.loads(data_json)
print(f"Получен результат: {data}")
except Exception as e:
print(f"Ошибка при разборе JSON: {e}")
continue
if (data.get("user_id") == user_id and
data.get("chat_id") == chat_id and
data.get("message_id") == message_id):
return data.get("text")
else:
# Если это не наш результат, вернем его обратно в очередь
await self.client.rpush(self.result_channel, data_json)
except Exception as e:
print(f"Ошибка при ожидании результата: {e}")
await asyncio.sleep(0.5)
2025-03-03 02:09:17 +01:00
2025-02-23 15:24:56 +01:00
return None