import asyncio import logging from config import load_config from models import AudioTask, TextTask from redis_client import RedisClient from transcriber import WhisperTranscriber from typing import List, Dict async def process_audio_tasks_batch(redis_client: RedisClient, transcriber: WhisperTranscriber, tasks_data: List[dict]): audio_tasks = [] for task_data in tasks_data: try: task = AudioTask(**task_data) audio_tasks.append(task) except Exception as e: logging.error(f"Error creating AudioTask from data: {e}") if not audio_tasks: return logging.info(f"Processing batch of {len(audio_tasks)} audio tasks...") transcription_tasks = [] for task in audio_tasks: transcription_tasks.append(transcribe_audio(transcriber, task)) text_tasks = await asyncio.gather(*transcription_tasks) text_tasks = [task for task in text_tasks if task is not None] if text_tasks: await redis_client.send_texts_batch(text_tasks) logging.info(f"Sent {len(text_tasks)} texts to summarize service") async def transcribe_audio(transcriber: WhisperTranscriber, task: AudioTask) -> TextTask: try: logging.info(f"Transcribing audio for task {task.uuid}...") loop = asyncio.get_running_loop() text = await loop.run_in_executor(None, transcriber.transcribe, task.file_path) logging.info(f"Transcription completed for task {task.uuid}, text length: {len(text)}") return TextTask( chat_id=task.chat_id, user_id=task.user_id, message_id=task.message_id, text=text ) except Exception as e: logging.error(f"Error transcribing audio for task {task.uuid}: {e}") return None async def main(): logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) config = load_config() logging.info(f"Loaded config: REDIS_HOST={config['REDIS_HOST']}, REDIS_PORT={config['REDIS_PORT']}") redis_client = RedisClient( host=config["REDIS_HOST"], port=config["REDIS_PORT"], task_channel=config["AUDIO_TASK_CHANNEL"], result_channel=config["TEXT_RESULT_CHANNEL"], text_task_channel=config["TEXT_TASK_CHANNEL"] ) transcriber = WhisperTranscriber(config["WHISPER_MODEL"], config["DEVICE"]) logging.info(f"Initialized transcriber with model {config['WHISPER_MODEL']} on {config['DEVICE']}") batch_size = config["BATCH_SIZE"] wait_timeout = config["WAIT_TIMEOUT"] logging.info(f"Waiting for audio tasks in channel {config['AUDIO_TASK_CHANNEL']} with batch size {batch_size}...") while True: tasks_data = await redis_client.get_tasks_batch(batch_size, wait_timeout) if tasks_data: logging.info(f"Received {len(tasks_data)} tasks") asyncio.create_task(process_audio_tasks_batch(redis_client, transcriber, tasks_data)) await asyncio.sleep(0.1) if __name__ == "__main__": asyncio.run(main())