Source code for camel.models.openai_audio_models

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import os
from typing import Any, List, Optional, Union

from openai import OpenAI, _legacy_response

from camel.types import AudioModelType, VoiceType


[docs] class OpenAIAudioModels: r"""Provides access to OpenAI's Text-to-Speech (TTS) and Speech_to_Text (STT) models.""" def __init__( self, api_key: Optional[str] = None, url: Optional[str] = None, ) -> None: r"""Initialize an instance of OpenAI.""" self._url = url or os.environ.get("OPENAI_API_BASE_URL") self._api_key = api_key or os.environ.get("OPENAI_API_KEY") self._client = OpenAI( timeout=120, max_retries=3, base_url=self._url, api_key=self._api_key, )
[docs] def text_to_speech( self, input: str, model_type: AudioModelType = AudioModelType.TTS_1, voice: VoiceType = VoiceType.ALLOY, storage_path: Optional[str] = None, **kwargs: Any, ) -> Union[ List[_legacy_response.HttpxBinaryResponseContent], _legacy_response.HttpxBinaryResponseContent, ]: r"""Convert text to speech using OpenAI's TTS model. This method converts the given input text to speech using the specified model and voice. Args: input (str): The text to be converted to speech. model_type (AudioModelType, optional): The TTS model to use. Defaults to `AudioModelType.TTS_1`. voice (VoiceType, optional): The voice to be used for generating speech. Defaults to `VoiceType.ALLOY`. storage_path (str, optional): The local path to store the generated speech file if provided, defaults to `None`. **kwargs (Any): Extra kwargs passed to the TTS API. Returns: Union[List[_legacy_response.HttpxBinaryResponseContent], _legacy_response.HttpxBinaryResponseContent]: List of response content object from OpenAI if input charaters more than 4096, single response content if input charaters less than 4096. Raises: Exception: If there's an error during the TTS API call. """ try: # Model only support at most 4096 characters one time. max_chunk_size = 4095 audio_chunks = [] chunk_index = 0 if len(input) > max_chunk_size: while input: if len(input) <= max_chunk_size: chunk = input input = '' else: # Find the nearest period before the chunk size limit while input[max_chunk_size - 1] != '.': max_chunk_size -= 1 chunk = input[:max_chunk_size] input = input[max_chunk_size:].lstrip() response = self._client.audio.speech.create( model=model_type.value, voice=voice.value, input=chunk, **kwargs, ) if storage_path: try: # Create a new storage path for each chunk file_name, file_extension = os.path.splitext( storage_path ) new_storage_path = ( f"{file_name}_{chunk_index}{file_extension}" ) response.write_to_file(new_storage_path) chunk_index += 1 except Exception as e: raise Exception( "Error during writing the file" ) from e audio_chunks.append(response) return audio_chunks else: response = self._client.audio.speech.create( model=model_type.value, voice=voice.value, input=input, **kwargs, ) if storage_path: try: response.write_to_file(storage_path) except Exception as e: raise Exception("Error during write the file") from e return response except Exception as e: raise Exception("Error during TTS API call") from e
def _split_audio( self, audio_file_path: str, chunk_size_mb: int = 24 ) -> list: r"""Split the audio file into smaller chunks. Since the Whisper API only supports files that are less than 25 MB. Args: audio_file_path (str): Path to the input audio file. chunk_size_mb (int, optional): Size of each chunk in megabytes. Defaults to `24`. Returns: list: List of paths to the split audio files. """ from pydub import AudioSegment audio = AudioSegment.from_file(audio_file_path) audio_format = os.path.splitext(audio_file_path)[1][1:].lower() # Calculate chunk size in bytes chunk_size_bytes = chunk_size_mb * 1024 * 1024 # Number of chunks needed num_chunks = os.path.getsize(audio_file_path) // chunk_size_bytes + 1 # Create a directory to store the chunks output_dir = os.path.splitext(audio_file_path)[0] + "_chunks" os.makedirs(output_dir, exist_ok=True) # Get audio chunk len in milliseconds chunk_size_milliseconds = len(audio) // (num_chunks) # Split the audio into chunks split_files = [] for i in range(num_chunks): start = i * chunk_size_milliseconds end = (i + 1) * chunk_size_milliseconds if i + 1 == num_chunks: chunk = audio[start:] else: chunk = audio[start:end] # Create new chunk path chunk_path = os.path.join(output_dir, f"chunk_{i}.{audio_format}") chunk.export(chunk_path, format=audio_format) split_files.append(chunk_path) return split_files
[docs] def speech_to_text( self, audio_file_path: str, translate_into_english: bool = False, **kwargs: Any, ) -> str: r"""Convert speech audio to text. Args: audio_file_path (str): The audio file path, supporting one of these formats: flac, mp3, mp4, mpeg, mpga, m4a, ogg, wav, or webm. translate_into_english (bool, optional): Whether to translate the speech into English. Defaults to `False`. **kwargs (Any): Extra keyword arguments passed to the Speech-to-Text (STT) API. Returns: str: The output text. Raises: ValueError: If the audio file format is not supported. Exception: If there's an error during the STT API call. """ supported_formats = [ "flac", "mp3", "mp4", "mpeg", "mpga", "m4a", "ogg", "wav", "webm", ] file_format = audio_file_path.split(".")[-1].lower() if file_format not in supported_formats: raise ValueError(f"Unsupported audio file format: {file_format}") try: if os.path.getsize(audio_file_path) > 24 * 1024 * 1024: # Split audio into chunks audio_chunks = self._split_audio(audio_file_path) texts = [] for chunk_path in audio_chunks: audio_data = open(chunk_path, "rb") if translate_into_english: translation = self._client.audio.translations.create( model="whisper-1", file=audio_data, **kwargs ) texts.append(translation.text) else: transcription = ( self._client.audio.transcriptions.create( model="whisper-1", file=audio_data, **kwargs ) ) texts.append(transcription.text) os.remove(chunk_path) # Delete temporary chunk file return " ".join(texts) else: # Process the entire audio file audio_data = open(audio_file_path, "rb") if translate_into_english: translation = self._client.audio.translations.create( model="whisper-1", file=audio_data, **kwargs ) return translation.text else: transcription = self._client.audio.transcriptions.create( model="whisper-1", file=audio_data, **kwargs ) return transcription.text except Exception as e: raise Exception("Error during STT API call") from e