qopscribe/speech_service/main.py

54 lines
1.7 KiB
Python

import asyncio
import json
from config import load_config
from models import AudioTask
from redis_client import RedisClient
from transcriber import WhisperTranscriber
async def process_audio_task(redis_client: RedisClient, transcriber: WhisperTranscriber, task_data: dict):
try:
task = AudioTask(**task_data)
except Exception as e:
print(f"Error creating AudioTask from data: {e}")
return
print(f"Processing task {task.uuid} ...")
loop = asyncio.get_running_loop()
text = await loop.run_in_executor(None, transcriber.transcribe, task.file_path)
result = {
"chat_id": task.chat_id,
"user_id": task.user_id,
"message_id": task.message_id,
"text": text
}
await redis_client.publish_result(result)
print(f"Published result for task {task.uuid}")
async def main():
config = load_config()
redis_client = RedisClient(
host=config["REDIS_HOST"],
port=config["REDIS_PORT"],
task_channel=config["AUDIO_TASK_CHANNEL"],
result_channel=config["TEXT_RESULT_CHANNEL"]
)
transcriber = WhisperTranscriber(config["WHISPER_MODEL"], config["DEVICE"])
pubsub = await redis_client.subscribe_tasks()
print("Subscribed to audio_tasks channel. Waiting for tasks...")
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if message:
try:
task_data = json.loads(message["data"])
except Exception as e:
print(f"Error parsing task message: {e}")
continue
asyncio.create_task(process_audio_task(redis_client, transcriber, task_data))
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())