♾ To infinity and beyond

This commit is contained in:
itqop 2023-12-22 02:43:45 +03:00
parent 192697627a
commit bf83b4d33e
7 changed files with 305 additions and 0 deletions

4
modules/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .sst import SST
from .tts import TTS
from .translate import Translate
from .video import Video

80
modules/sst.py Normal file
View File

@ -0,0 +1,80 @@
import wave
from vosk import Model, KaldiRecognizer, SetLogLevel
from moviepy.editor import VideoFileClip
import tempfile
import os
import webrtcvad
class SST:
def __init__(self, model_path, lang="en-us"):
SetLogLevel(0)
self.model = Model(lang=lang, model_path=model_path)
self.recognizer = KaldiRecognizer(self.model, 16000)
self.recognizer.SetWords(True)
self.recognizer.SetPartialWords(True)
def process_audio_with_timing(self, audio_path):
with wave.open(audio_path, "rb") as wf:
if wf.getnchannels() != 1 or wf.getsampwidth() != 2 or wf.getcomptype() != "NONE":
raise ValueError("Audio file must be WAV format mono PCM.")
vad_timing = []
in_speech = False
while True:
data = wf.readframes(4000)
if len(data) == 0:
break
is_speech = self.vad.is_speech(data, sample_rate=wf.getframerate())
if is_speech and not in_speech:
vad_timing.append(wf.tell() / wf.getframerate())
elif not is_speech and in_speech:
vad_timing.append(wf.tell() / wf.getframerate())
in_speech = is_speech
if self.recognizer.AcceptWaveform(data):
result = self.recognizer.Result()
print(result)
else:
partial_result = self.recognizer.PartialResult()
print(partial_result)
final_result = self.recognizer.FinalResult()
print(final_result)
return final_result, vad_timing
def process_video(self, video_path):
audio_tempfile = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
with VideoFileClip(video_path) as video:
audio = video.audio
audio.write_audiofile(audio_tempfile.name)
result = self.process_audio(audio_tempfile.name)
os.remove(audio_tempfile.name)
return result
def _extract_audio_from_video(self, video_path):
audio_tempfile = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
with VideoFileClip(video_path) as video:
audio = video.audio
audio.write_audiofile(audio_tempfile.name)
return audio_tempfile.name
def _cleanup_temp_file(self, file_path):
os.remove(file_path)
def set_model_language(self, lang):
self.model.set_string('lang', lang)
def set_recognizer_params(self, sample_rate=16000, words=True, partial_words=True):
self.recognizer.SetSampleRate(sample_rate)
self.recognizer.SetWords(words)
self.recognizer.SetPartialWords(partial_words)

45
modules/translate.py Normal file
View File

@ -0,0 +1,45 @@
import aiohttp
import json
from langdetect import detect
import re
class Translate:
def __init__(self, api_url="https://libretranslate.com/translate"):
self.api_url = api_url
async def translate_text(self, text, source_lang, target_lang="ru"):
params = {
'q': text,
'source': source_lang,
'target': target_lang,
}
async with aiohttp.ClientSession() as session:
async with session.get(self.api_url, params=params) as response:
data = await response.text()
translation = json.loads(data)[0]['translatedText']
return translation
async def preprocess_text(self, text):
text = re.sub(r'\s+', ' ', text) # Remove extra spaces
text = re.sub(r'[^A-Za-z0-9А-Яа-я\s]', '', text) # Remove special characters
return text
async def postprocess_text(self, text):
return f"[Translated]: {text}"
async def batch_translate_text(self, text_list, source_lang, target_lang="ru"):
translations = []
for text in text_list:
preprocessed_text = self.preprocess_text(text)
translated_text = await self.translate_text(preprocessed_text, source_lang, target_lang)
postprocessed_text = self.postprocess_text(translated_text)
translations.append(postprocessed_text)
return translations
async def detect_language(self, text):
detected_lang = detect(text)
return detected_lang

43
modules/tts.py Normal file
View File

@ -0,0 +1,43 @@
from vosk_tts import Model, Synth
import os
from moviepy.editor import AudioFileClip
class TTS:
def __init__(self, model_name="vosk-model-tts-ru-0.4-multi"):
self.model = Model(model_name=model_name)
self.synth = Synth(self.model)
def text_to_speech(self, text, output_path, speaker_id=1):
self.synth.synth(text, output_path, speaker_id)
def batch_text_to_speech(self, text_list, output_folder):
for text, speaker_id in text_list:
output_path = f"{output_folder}/output_{speaker_id}.wav"
self.text_to_speech(text, output_path, speaker_id)
@staticmethod
def create_text_speaker_tuples(final_result, vad_timing):
text_speaker_tuples = []
current_speaker_id = 1
for start, end in zip(vad_timing[::2], vad_timing[1::2]):
text_speaker_tuples.append((final_result, current_speaker_id))
current_speaker_id = 4 if current_speaker_id == 1 else 1
return text_speaker_tuples
@staticmethod
def create_output_folder(folder_name):
os.makedirs(folder_name, exist_ok=True)
def set_model(self, model_name):
self.model.set_string('model_name', model_name)
self.synth = Synth(self.model)
def change_speaker_id(self, text_speaker_tuples, new_speaker_id):
return [(text, new_speaker_id) for text, _ in text_speaker_tuples]
def combine_audio_files(self, audio_paths, output_path):
clips = [AudioFileClip(audio_path) for audio_path in audio_paths]
combined_clip = concatenate_audioclips(clips)
combined_clip.write_audiofile(output_path)

76
modules/video.py Normal file
View File

@ -0,0 +1,76 @@
import os
import subprocess
from moviepy.editor import VideoFileClip, AudioFileClip, AudioArrayClip
from moviepy.editor import concatenate_videoclips
import numpy as np
class Video:
def __init__(self):
pass
async def download_video_from_url(self, video_url, output_path):
# Download a video from a given URL and save it to the specified output path
# Note: You may use a library like youtube_dl for more advanced video downloading
command = ["ffmpeg", "-i", video_url, "-c", "copy", output_path]
subprocess.run(command)
async def load_video_from_path(self, video_path):
# Load a video from the specified file path using moviepy
return VideoFileClip(video_path)
async def replace_audio_in_range(self, video_clip, audio_path, start_time, end_time):
original_audio = video_clip.audio
new_audio_clip = AudioFileClip(audio_path)
new_audio_clip = new_audio_clip.subclip(start_time, end_time)
original_duration = original_audio.duration
if end_time > original_duration:
silence_duration = end_time - original_duration
silence = AudioArrayClip(np.zeros(int(silence_duration * original_audio.fps)),
fps=original_audio.fps)
new_audio_clip = self.concatenate_audioclips([new_audio_clip, silence])
final_audio_clip = self.concatenate_audioclips([
original_audio.subclip(0, start_time),
new_audio_clip,
original_audio.subclip(end_time, original_duration)
])
video_clip = video_clip.set_audio(final_audio_clip)
return video_clip
@staticmethod
async def save_video(video_clip, output_path):
video_clip.write_videofile(output_path, codec="libx264", audio_codec="aac")
@staticmethod
async def get_video_duration(video_clip):
return video_clip.duration
@staticmethod
async def create_output_folder(folder_name):
os.makedirs(folder_name, exist_ok=True)
async def concatenate_video_clips(self, video_clips):
# Concatenate a list of video clips into a single clip
# Make sure the list is not empty
if not video_clips:
raise ValueError("Empty list of video clips")
# Check if all clips have the same resolution and fps
first_clip = video_clips[0]
for clip in video_clips[1:]:
if clip.size != first_clip.size or clip.fps != first_clip.fps:
raise ValueError("All video clips must have the same resolution and fps for concatenation")
# Concatenate the clips
concatenated_clip = concatenate_videoclips(video_clips)
return concatenated_clip

57
start.py Normal file
View File

@ -0,0 +1,57 @@
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import FileResponse
from modules import SST
from modules import TTS
from modules import Translate
from modules import Video
import os
import shutil
app = FastAPI()
UPLOAD_FOLDER = "uploads"
OUTPUT_FOLDER = "output"
AUDIO_FOLDER = "audio"
VIDEO_FOLDER = "video"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
os.makedirs(AUDIO_FOLDER, exist_ok=True)
os.makedirs(VIDEO_FOLDER, exist_ok=True)
sst = SST()
tts = TTS()
translator = Translate()
video_manager = Video()
@app.post("/process_video/")
async def process_video(video_file: UploadFile = File(...)):
video_path = os.path.join(UPLOAD_FOLDER, video_file.filename)
with open(video_path, "wb") as video:
video.write(video_file.file.read())
audio_output_path = os.path.join(AUDIO_FOLDER, f"{os.path.splitext(video_file.filename)[0]}.wav")
await video_manager.extract_audio(video_path, audio_output_path)
final_result, vad_timing = await sst.process_audio_with_timing(audio_output_path)
translated_text = await translator.translate_text(final_result, source_lang="en", target_lang="ru")
text_speaker_tuples = [(translated_text, 1)]
await tts.batch_text_to_speech(text_speaker_tuples, output_folder=OUTPUT_FOLDER)
output_video_path = os.path.join(VIDEO_FOLDER, f"{os.path.splitext(video_file.filename)[0]}_processed.mp4")
video_clip = await video_manager.load_video_from_path(video_path)
for start, end in zip(vad_timing[::2], vad_timing[1::2]):
await video_manager.replace_audio_in_range(video_clip, os.path.join(OUTPUT_FOLDER, "output_1.wav"), start, end)
await video_manager.save_video(video_clip, output_video_path)
shutil.rmtree(UPLOAD_FOLDER)
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
shutil.rmtree(AUDIO_FOLDER)
os.makedirs(AUDIO_FOLDER, exist_ok=True)
shutil.rmtree(OUTPUT_FOLDER)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
return FileResponse(output_video_path, media_type="video/mp4", filename=output_video_path)

0
templates/root.html Normal file
View File