From 115aea47921188d3cbda0ea1357b849822faee7d Mon Sep 17 00:00:00 2001 From: itqop Date: Mon, 3 Mar 2025 04:09:17 +0300 Subject: [PATCH] test new scheme --- speech_service/config.py | 3 +- speech_service/main.py | 63 +++++--------------------- speech_service/redis_client.py | 22 +++++++-- summarize_service/app/redis_client.py | 6 +-- telegram_bot/handlers/audio_handler.py | 15 ++++-- telegram_bot/services/redis_service.py | 42 +++++++++-------- 6 files changed, 68 insertions(+), 83 deletions(-) diff --git a/speech_service/config.py b/speech_service/config.py index e2aa109..85d037d 100644 --- a/speech_service/config.py +++ b/speech_service/config.py @@ -9,6 +9,7 @@ def load_config(): "WHISPER_MODEL": os.getenv("WHISPER_MODEL", "dvislobokov/whisper-large-v3-turbo-russian"), "DEVICE": os.getenv("DEVICE", "cuda"), "AUDIO_TASK_CHANNEL": os.getenv("AUDIO_TASK_CHANNEL", "audio_tasks"), - "TEXT_RESULT_CHANNEL": os.getenv("TEXT_RESULT_CHANNEL", "texts"), + "TEXT_RESULT_CHANNEL": os.getenv("TEXT_RESULT_CHANNEL", "text_result_channel"), + "TEXT_TASK_CHANNEL": os.getenv("TEXT_TASK_CHANNEL", "text_task_channel"), "OLLAMA_URL": os.getenv("OLLAMA_URL", "http://ollama:11434/api/generate/"), } diff --git a/speech_service/main.py b/speech_service/main.py index 37a42aa..aad742c 100644 --- a/speech_service/main.py +++ b/speech_service/main.py @@ -5,42 +5,8 @@ from models import AudioTask from redis_client import RedisClient from transcriber import WhisperTranscriber import httpx -import json -async def request_summary(text: str, ollama_url: str) -> str: - """ - Делает запрос к Ollama API для суммаризации текста. - Использует модель gemma2:2b, системное сообщение и температуру 0.6. - Запрос делается без стриминга. - """ - payload = { - "model": "gemma2:2b", - "prompt": text, - "system": ( - "Ты — помощник для суммаризации текста. Твоя задача: выделить ключевые моменты " - "из высказывания человека, переформулируя их кратко и сохраняя оригинальный смысл. " - "Очень важно: не отвечай на вопросы, не рассуждай, не комментируй, не добавляй ничего от себя, " - "выполняй только суммаризацию." - ), - "stream": False, - "options": { - "temperature": 0.6 - } - } - async with httpx.AsyncClient() as client: - try: - response = await client.post(ollama_url, json=payload, timeout=60.0) - response.raise_for_status() - except Exception as e: - print(f"LLM API request failed: {e}") - return text - data = response.json() - summary = data.get("response") - if summary: - return summary.strip() - return text - -async def process_audio_task(redis_client: RedisClient, transcriber: WhisperTranscriber, task_data: dict, ollama_url: str): +async def process_audio_task(redis_client: RedisClient, transcriber: WhisperTranscriber, task_data: dict): try: task = AudioTask(**task_data) except Exception as e: @@ -51,17 +17,15 @@ async def process_audio_task(redis_client: RedisClient, transcriber: WhisperTran loop = asyncio.get_running_loop() text = await loop.run_in_executor(None, transcriber.transcribe, task.file_path) - if task_data.get("sum") == 1: - text = await request_summary(text, ollama_url) - - result = { + # Отправляем текст в сервис суммаризации + summarize_task = { "chat_id": task.chat_id, "user_id": task.user_id, "message_id": task.message_id, "text": text } - await redis_client.publish_result(result) - print(f"Published result for task {task.uuid}") + await redis_client.send_to_summarize(summarize_task) + print(f"Sent text to summarize service for task {task.uuid}") async def main(): @@ -70,22 +34,17 @@ async def main(): host=config["REDIS_HOST"], port=config["REDIS_PORT"], task_channel=config["AUDIO_TASK_CHANNEL"], - result_channel=config["TEXT_RESULT_CHANNEL"] + result_channel=config["TEXT_RESULT_CHANNEL"], + text_task_channel="text_task_channel" ) transcriber = WhisperTranscriber(config["WHISPER_MODEL"], config["DEVICE"]) - pubsub = await redis_client.subscribe_tasks() - print("Subscribed to audio_tasks channel. Waiting for tasks...") + print("Waiting for audio tasks...") while True: - message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) - if message: - try: - task_data = json.loads(message["data"]) - except Exception as e: - print(f"Error parsing task message: {e}") - continue - asyncio.create_task(process_audio_task(redis_client, transcriber, task_data, config["OLLAMA_URL"])) + task_data = await redis_client.get_task(timeout=1) + if task_data: + asyncio.create_task(process_audio_task(redis_client, transcriber, task_data)) await asyncio.sleep(0.1) if __name__ == "__main__": diff --git a/speech_service/redis_client.py b/speech_service/redis_client.py index eabf477..34aeade 100644 --- a/speech_service/redis_client.py +++ b/speech_service/redis_client.py @@ -2,15 +2,27 @@ import json import redis.asyncio as redis class RedisClient: - def __init__(self, host: str, port: int, task_channel: str, result_channel: str): + def __init__(self, host: str, port: int, task_channel: str, result_channel: str, text_task_channel: str = "text_task_channel"): self.client = redis.Redis(host=host, port=port, decode_responses=True) self.task_channel = task_channel self.result_channel = result_channel + self.text_task_channel = text_task_channel - async def subscribe_tasks(self): - pubsub = self.client.pubsub() - await pubsub.subscribe(self.task_channel) - return pubsub + async def get_task(self, timeout=0): + """Получает задачу из очереди с помощью BLPOP""" + result = await self.client.blpop(self.task_channel, timeout=timeout) + if result: + _, task_json = result + try: + return json.loads(task_json) + except Exception as e: + print(f"Error parsing task message: {e}") + return None async def publish_result(self, result: dict): + """Публикует результат в канал результатов""" await self.client.publish(self.result_channel, json.dumps(result)) + + async def send_to_summarize(self, task_data: dict): + """Отправляет текст в сервис суммаризации""" + await self.client.rpush(self.text_task_channel, json.dumps(task_data)) diff --git a/summarize_service/app/redis_client.py b/summarize_service/app/redis_client.py index 7c15f1c..408f7f5 100644 --- a/summarize_service/app/redis_client.py +++ b/summarize_service/app/redis_client.py @@ -26,7 +26,7 @@ class RedisClient: if res: _, task_json = res try: - task = Task.parse_raw(task_json) + task = Task.model_validate_json(task_json) tasks.append(task) except Exception as e: print("Ошибка парсинга задачи:", e) @@ -36,7 +36,7 @@ class RedisClient: if task_json is None: break try: - task = Task.parse_raw(task_json) + task = Task.model_validate_json(task_json) tasks.append(task) except Exception as e: print("Ошибка парсинга задачи:", e) @@ -44,4 +44,4 @@ class RedisClient: def publish_result(self, result: dict): result_json = json.dumps(result) - self.client.rpush(self.result_channel, result_json) + self.client.publish(self.result_channel, result_json) diff --git a/telegram_bot/handlers/audio_handler.py b/telegram_bot/handlers/audio_handler.py index bd626d8..a9d3786 100644 --- a/telegram_bot/handlers/audio_handler.py +++ b/telegram_bot/handlers/audio_handler.py @@ -34,28 +34,35 @@ async def handle_voice_and_video(message: types.Message, redis_service, storage_ os.remove(temp_destination) + # Отправляем сообщение пользователю о начале обработки + processing_msg = await message.reply("Обрабатываю аудио, пожалуйста, подождите...") + task_data = { "uuid": file_uuid, "file_path": wav_destination, "user_id": message.from_user.id, "chat_id": message.chat.id, - "message_id": message.message_id, - "sum": 1 + "message_id": message.message_id } await redis_service.publish_task(task_data) - #await message.reply("Waiting for transcription...") text = await redis_service.wait_for_text( user_id=message.from_user.id, chat_id=message.chat.id, message_id=message.message_id ) + + # Удаляем временный файл os.remove(wav_destination) + + # Удаляем сообщение о обработке + await processing_msg.delete() + if text: await send_long_message(message, text) else: - await message.reply("Sorry, transcription result was not received within the timeout.") + await message.reply("К сожалению, не удалось получить результат обработки в отведенное время.") async def send_long_message(message: types.Message, text: str): """Отправляет длинный текст, разбивая его на части по 4096 символов""" diff --git a/telegram_bot/services/redis_service.py b/telegram_bot/services/redis_service.py index f16f21d..9d2d594 100644 --- a/telegram_bot/services/redis_service.py +++ b/telegram_bot/services/redis_service.py @@ -1,29 +1,35 @@ import json import redis.asyncio as redis +import asyncio class RedisService: def __init__(self, host: str, port: int): self.client = redis.Redis(host=host, port=port, decode_responses=True) + self.task_channel = "audio_tasks" + self.result_channel = "text_result_channel" async def publish_task(self, task_data: dict): - channel = "audio_tasks" - await self.client.publish(channel, json.dumps(task_data)) + """Отправляет задачу в очередь аудио задач""" + await self.client.rpush(self.task_channel, json.dumps(task_data)) async def wait_for_text(self, user_id: int, chat_id: int, message_id: int, timeout: int = 30): - pubsub = self.client.pubsub() - await pubsub.subscribe("texts") - try: - async for message in pubsub.listen(): - if message["type"] != "message": - continue - try: - data = json.loads(message["data"]) - except Exception: - continue - if (data.get("user_id") == user_id and - data.get("chat_id") == chat_id and - data.get("message_id") == message_id): - return data.get("text") - finally: - await pubsub.unsubscribe("texts") + """Ожидает результат обработки текста с помощью блокирующего ожидания""" + start_time = asyncio.get_event_loop().time() + + while asyncio.get_event_loop().time() - start_time < timeout: + result = await self.client.blpop(self.result_channel, timeout=1) + if not result: + continue + + _, data_json = result + try: + data = json.loads(data_json) + except Exception: + continue + + if (data.get("user_id") == user_id and + data.get("chat_id") == chat_id and + data.get("message_id") == message_id): + return data.get("text") + return None