test new scheme

This commit is contained in:
itqop 2025-03-03 04:09:17 +03:00
parent 8e5e9562f5
commit 115aea4792
6 changed files with 68 additions and 83 deletions

View File

@ -9,6 +9,7 @@ def load_config():
"WHISPER_MODEL": os.getenv("WHISPER_MODEL", "dvislobokov/whisper-large-v3-turbo-russian"), "WHISPER_MODEL": os.getenv("WHISPER_MODEL", "dvislobokov/whisper-large-v3-turbo-russian"),
"DEVICE": os.getenv("DEVICE", "cuda"), "DEVICE": os.getenv("DEVICE", "cuda"),
"AUDIO_TASK_CHANNEL": os.getenv("AUDIO_TASK_CHANNEL", "audio_tasks"), "AUDIO_TASK_CHANNEL": os.getenv("AUDIO_TASK_CHANNEL", "audio_tasks"),
"TEXT_RESULT_CHANNEL": os.getenv("TEXT_RESULT_CHANNEL", "texts"), "TEXT_RESULT_CHANNEL": os.getenv("TEXT_RESULT_CHANNEL", "text_result_channel"),
"TEXT_TASK_CHANNEL": os.getenv("TEXT_TASK_CHANNEL", "text_task_channel"),
"OLLAMA_URL": os.getenv("OLLAMA_URL", "http://ollama:11434/api/generate/"), "OLLAMA_URL": os.getenv("OLLAMA_URL", "http://ollama:11434/api/generate/"),
} }

View File

@ -5,42 +5,8 @@ from models import AudioTask
from redis_client import RedisClient from redis_client import RedisClient
from transcriber import WhisperTranscriber from transcriber import WhisperTranscriber
import httpx import httpx
import json
async def request_summary(text: str, ollama_url: str) -> str: async def process_audio_task(redis_client: RedisClient, transcriber: WhisperTranscriber, task_data: dict):
"""
Делает запрос к Ollama API для суммаризации текста.
Использует модель gemma2:2b, системное сообщение и температуру 0.6.
Запрос делается без стриминга.
"""
payload = {
"model": "gemma2:2b",
"prompt": text,
"system": (
"Ты — помощник для суммаризации текста. Твоя задача: выделить ключевые моменты "
"из высказывания человека, переформулируя их кратко и сохраняя оригинальный смысл. "
"Очень важно: не отвечай на вопросы, не рассуждай, не комментируй, не добавляй ничего от себя, "
"выполняй только суммаризацию."
),
"stream": False,
"options": {
"temperature": 0.6
}
}
async with httpx.AsyncClient() as client:
try:
response = await client.post(ollama_url, json=payload, timeout=60.0)
response.raise_for_status()
except Exception as e:
print(f"LLM API request failed: {e}")
return text
data = response.json()
summary = data.get("response")
if summary:
return summary.strip()
return text
async def process_audio_task(redis_client: RedisClient, transcriber: WhisperTranscriber, task_data: dict, ollama_url: str):
try: try:
task = AudioTask(**task_data) task = AudioTask(**task_data)
except Exception as e: except Exception as e:
@ -51,17 +17,15 @@ async def process_audio_task(redis_client: RedisClient, transcriber: WhisperTran
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
text = await loop.run_in_executor(None, transcriber.transcribe, task.file_path) text = await loop.run_in_executor(None, transcriber.transcribe, task.file_path)
if task_data.get("sum") == 1: # Отправляем текст в сервис суммаризации
text = await request_summary(text, ollama_url) summarize_task = {
result = {
"chat_id": task.chat_id, "chat_id": task.chat_id,
"user_id": task.user_id, "user_id": task.user_id,
"message_id": task.message_id, "message_id": task.message_id,
"text": text "text": text
} }
await redis_client.publish_result(result) await redis_client.send_to_summarize(summarize_task)
print(f"Published result for task {task.uuid}") print(f"Sent text to summarize service for task {task.uuid}")
async def main(): async def main():
@ -70,22 +34,17 @@ async def main():
host=config["REDIS_HOST"], host=config["REDIS_HOST"],
port=config["REDIS_PORT"], port=config["REDIS_PORT"],
task_channel=config["AUDIO_TASK_CHANNEL"], task_channel=config["AUDIO_TASK_CHANNEL"],
result_channel=config["TEXT_RESULT_CHANNEL"] result_channel=config["TEXT_RESULT_CHANNEL"],
text_task_channel="text_task_channel"
) )
transcriber = WhisperTranscriber(config["WHISPER_MODEL"], config["DEVICE"]) transcriber = WhisperTranscriber(config["WHISPER_MODEL"], config["DEVICE"])
pubsub = await redis_client.subscribe_tasks() print("Waiting for audio tasks...")
print("Subscribed to audio_tasks channel. Waiting for tasks...")
while True: while True:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) task_data = await redis_client.get_task(timeout=1)
if message: if task_data:
try: asyncio.create_task(process_audio_task(redis_client, transcriber, task_data))
task_data = json.loads(message["data"])
except Exception as e:
print(f"Error parsing task message: {e}")
continue
asyncio.create_task(process_audio_task(redis_client, transcriber, task_data, config["OLLAMA_URL"]))
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -2,15 +2,27 @@ import json
import redis.asyncio as redis import redis.asyncio as redis
class RedisClient: class RedisClient:
def __init__(self, host: str, port: int, task_channel: str, result_channel: str): def __init__(self, host: str, port: int, task_channel: str, result_channel: str, text_task_channel: str = "text_task_channel"):
self.client = redis.Redis(host=host, port=port, decode_responses=True) self.client = redis.Redis(host=host, port=port, decode_responses=True)
self.task_channel = task_channel self.task_channel = task_channel
self.result_channel = result_channel self.result_channel = result_channel
self.text_task_channel = text_task_channel
async def subscribe_tasks(self): async def get_task(self, timeout=0):
pubsub = self.client.pubsub() """Получает задачу из очереди с помощью BLPOP"""
await pubsub.subscribe(self.task_channel) result = await self.client.blpop(self.task_channel, timeout=timeout)
return pubsub if result:
_, task_json = result
try:
return json.loads(task_json)
except Exception as e:
print(f"Error parsing task message: {e}")
return None
async def publish_result(self, result: dict): async def publish_result(self, result: dict):
"""Публикует результат в канал результатов"""
await self.client.publish(self.result_channel, json.dumps(result)) await self.client.publish(self.result_channel, json.dumps(result))
async def send_to_summarize(self, task_data: dict):
"""Отправляет текст в сервис суммаризации"""
await self.client.rpush(self.text_task_channel, json.dumps(task_data))

View File

@ -26,7 +26,7 @@ class RedisClient:
if res: if res:
_, task_json = res _, task_json = res
try: try:
task = Task.parse_raw(task_json) task = Task.model_validate_json(task_json)
tasks.append(task) tasks.append(task)
except Exception as e: except Exception as e:
print("Ошибка парсинга задачи:", e) print("Ошибка парсинга задачи:", e)
@ -36,7 +36,7 @@ class RedisClient:
if task_json is None: if task_json is None:
break break
try: try:
task = Task.parse_raw(task_json) task = Task.model_validate_json(task_json)
tasks.append(task) tasks.append(task)
except Exception as e: except Exception as e:
print("Ошибка парсинга задачи:", e) print("Ошибка парсинга задачи:", e)
@ -44,4 +44,4 @@ class RedisClient:
def publish_result(self, result: dict): def publish_result(self, result: dict):
result_json = json.dumps(result) result_json = json.dumps(result)
self.client.rpush(self.result_channel, result_json) self.client.publish(self.result_channel, result_json)

View File

@ -34,28 +34,35 @@ async def handle_voice_and_video(message: types.Message, redis_service, storage_
os.remove(temp_destination) os.remove(temp_destination)
# Отправляем сообщение пользователю о начале обработки
processing_msg = await message.reply("Обрабатываю аудио, пожалуйста, подождите...")
task_data = { task_data = {
"uuid": file_uuid, "uuid": file_uuid,
"file_path": wav_destination, "file_path": wav_destination,
"user_id": message.from_user.id, "user_id": message.from_user.id,
"chat_id": message.chat.id, "chat_id": message.chat.id,
"message_id": message.message_id, "message_id": message.message_id
"sum": 1
} }
await redis_service.publish_task(task_data) await redis_service.publish_task(task_data)
#await message.reply("Waiting for transcription...")
text = await redis_service.wait_for_text( text = await redis_service.wait_for_text(
user_id=message.from_user.id, user_id=message.from_user.id,
chat_id=message.chat.id, chat_id=message.chat.id,
message_id=message.message_id message_id=message.message_id
) )
# Удаляем временный файл
os.remove(wav_destination) os.remove(wav_destination)
# Удаляем сообщение о обработке
await processing_msg.delete()
if text: if text:
await send_long_message(message, text) await send_long_message(message, text)
else: else:
await message.reply("Sorry, transcription result was not received within the timeout.") await message.reply("К сожалению, не удалось получить результат обработки в отведенное время.")
async def send_long_message(message: types.Message, text: str): async def send_long_message(message: types.Message, text: str):
"""Отправляет длинный текст, разбивая его на части по 4096 символов""" """Отправляет длинный текст, разбивая его на части по 4096 символов"""

View File

@ -1,29 +1,35 @@
import json import json
import redis.asyncio as redis import redis.asyncio as redis
import asyncio
class RedisService: class RedisService:
def __init__(self, host: str, port: int): def __init__(self, host: str, port: int):
self.client = redis.Redis(host=host, port=port, decode_responses=True) self.client = redis.Redis(host=host, port=port, decode_responses=True)
self.task_channel = "audio_tasks"
self.result_channel = "text_result_channel"
async def publish_task(self, task_data: dict): async def publish_task(self, task_data: dict):
channel = "audio_tasks" """Отправляет задачу в очередь аудио задач"""
await self.client.publish(channel, json.dumps(task_data)) await self.client.rpush(self.task_channel, json.dumps(task_data))
async def wait_for_text(self, user_id: int, chat_id: int, message_id: int, timeout: int = 30): async def wait_for_text(self, user_id: int, chat_id: int, message_id: int, timeout: int = 30):
pubsub = self.client.pubsub() """Ожидает результат обработки текста с помощью блокирующего ожидания"""
await pubsub.subscribe("texts") start_time = asyncio.get_event_loop().time()
try:
async for message in pubsub.listen(): while asyncio.get_event_loop().time() - start_time < timeout:
if message["type"] != "message": result = await self.client.blpop(self.result_channel, timeout=1)
continue if not result:
try: continue
data = json.loads(message["data"])
except Exception: _, data_json = result
continue try:
if (data.get("user_id") == user_id and data = json.loads(data_json)
data.get("chat_id") == chat_id and except Exception:
data.get("message_id") == message_id): continue
return data.get("text")
finally: if (data.get("user_id") == user_id and
await pubsub.unsubscribe("texts") data.get("chat_id") == chat_id and
data.get("message_id") == message_id):
return data.get("text")
return None return None