2025-02-23 16:22:33 +01:00
|
|
|
import asyncio
|
2025-03-03 02:46:27 +01:00
|
|
|
import logging
|
2025-02-23 16:22:33 +01:00
|
|
|
from config import load_config
|
2025-03-03 03:22:31 +01:00
|
|
|
from models import AudioTask, TextTask
|
2025-02-23 16:22:33 +01:00
|
|
|
from redis_client import RedisClient
|
|
|
|
from transcriber import WhisperTranscriber
|
2025-03-03 03:22:31 +01:00
|
|
|
from typing import List, Dict
|
2025-02-23 16:22:33 +01:00
|
|
|
|
2025-03-03 03:22:31 +01:00
|
|
|
async def process_audio_tasks_batch(redis_client: RedisClient, transcriber: WhisperTranscriber, tasks_data: List[dict]):
|
|
|
|
audio_tasks = []
|
|
|
|
for task_data in tasks_data:
|
|
|
|
try:
|
|
|
|
task = AudioTask(**task_data)
|
|
|
|
audio_tasks.append(task)
|
|
|
|
except Exception as e:
|
|
|
|
logging.error(f"Error creating AudioTask from data: {e}")
|
|
|
|
|
|
|
|
if not audio_tasks:
|
2025-02-23 16:22:33 +01:00
|
|
|
return
|
2025-03-03 03:22:31 +01:00
|
|
|
|
|
|
|
logging.info(f"Processing batch of {len(audio_tasks)} audio tasks...")
|
|
|
|
|
|
|
|
transcription_tasks = []
|
|
|
|
for task in audio_tasks:
|
|
|
|
transcription_tasks.append(transcribe_audio(transcriber, task))
|
|
|
|
|
|
|
|
text_tasks = await asyncio.gather(*transcription_tasks)
|
|
|
|
|
|
|
|
text_tasks = [task for task in text_tasks if task is not None]
|
|
|
|
|
|
|
|
if text_tasks:
|
|
|
|
await redis_client.send_texts_batch(text_tasks)
|
|
|
|
logging.info(f"Sent {len(text_tasks)} texts to summarize service")
|
2025-02-23 16:22:33 +01:00
|
|
|
|
2025-03-03 03:22:31 +01:00
|
|
|
async def transcribe_audio(transcriber: WhisperTranscriber, task: AudioTask) -> TextTask:
|
|
|
|
try:
|
|
|
|
logging.info(f"Transcribing audio for task {task.uuid}...")
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
text = await loop.run_in_executor(None, transcriber.transcribe, task.file_path)
|
|
|
|
logging.info(f"Transcription completed for task {task.uuid}, text length: {len(text)}")
|
|
|
|
|
|
|
|
return TextTask(
|
|
|
|
chat_id=task.chat_id,
|
|
|
|
user_id=task.user_id,
|
|
|
|
message_id=task.message_id,
|
|
|
|
text=text
|
|
|
|
)
|
|
|
|
except Exception as e:
|
|
|
|
logging.error(f"Error transcribing audio for task {task.uuid}: {e}")
|
|
|
|
return None
|
2025-02-23 22:22:12 +01:00
|
|
|
|
2025-02-23 16:22:33 +01:00
|
|
|
async def main():
|
2025-03-03 02:46:27 +01:00
|
|
|
logging.basicConfig(
|
|
|
|
level=logging.INFO,
|
|
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
|
|
)
|
|
|
|
|
2025-02-23 16:22:33 +01:00
|
|
|
config = load_config()
|
2025-03-03 02:46:27 +01:00
|
|
|
logging.info(f"Loaded config: REDIS_HOST={config['REDIS_HOST']}, REDIS_PORT={config['REDIS_PORT']}")
|
|
|
|
|
2025-02-23 16:22:33 +01:00
|
|
|
redis_client = RedisClient(
|
|
|
|
host=config["REDIS_HOST"],
|
|
|
|
port=config["REDIS_PORT"],
|
|
|
|
task_channel=config["AUDIO_TASK_CHANNEL"],
|
2025-03-03 02:09:17 +01:00
|
|
|
result_channel=config["TEXT_RESULT_CHANNEL"],
|
2025-03-03 03:22:31 +01:00
|
|
|
text_task_channel=config["TEXT_TASK_CHANNEL"]
|
2025-02-23 16:22:33 +01:00
|
|
|
)
|
|
|
|
transcriber = WhisperTranscriber(config["WHISPER_MODEL"], config["DEVICE"])
|
2025-03-03 02:46:27 +01:00
|
|
|
logging.info(f"Initialized transcriber with model {config['WHISPER_MODEL']} on {config['DEVICE']}")
|
2025-02-23 16:22:33 +01:00
|
|
|
|
2025-03-03 03:22:31 +01:00
|
|
|
batch_size = config["BATCH_SIZE"]
|
|
|
|
wait_timeout = config["WAIT_TIMEOUT"]
|
|
|
|
logging.info(f"Waiting for audio tasks in channel {config['AUDIO_TASK_CHANNEL']} with batch size {batch_size}...")
|
2025-02-23 16:22:33 +01:00
|
|
|
|
|
|
|
while True:
|
2025-03-03 03:22:31 +01:00
|
|
|
tasks_data = await redis_client.get_tasks_batch(batch_size, wait_timeout)
|
|
|
|
if tasks_data:
|
|
|
|
logging.info(f"Received {len(tasks_data)} tasks")
|
|
|
|
asyncio.create_task(process_audio_tasks_batch(redis_client, transcriber, tasks_data))
|
2025-02-23 16:22:33 +01:00
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
asyncio.run(main())
|