Source code for camel.toolkits.audio_analysis_toolkit
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import os
import uuid
from typing import List, Optional
from urllib.parse import urlparse
import requests
from camel.logger import get_logger
from camel.messages import BaseMessage
from camel.models import BaseAudioModel, BaseModelBackend, OpenAIAudioModels
from camel.toolkits.base import BaseToolkit
from camel.toolkits.function_tool import FunctionTool
logger = get_logger(__name__)
def download_file(url: str, cache_dir: str) -> str:
r"""Download a file from a URL to a local cache directory.
Args:
url (str): The URL of the file to download.
cache_dir (str): The directory to save the downloaded file.
Returns:
str: The path to the downloaded file.
Raises:
Exception: If the download fails.
"""
# Create cache directory if it doesn't exist
os.makedirs(cache_dir, exist_ok=True)
# Extract filename from URL or generate a unique one
parsed_url = urlparse(url)
filename = os.path.basename(parsed_url.path)
if not filename:
# Generate a unique filename if none is provided in the URL
file_ext = ".mp3" # Default extension
content_type = None
# Try to get the file extension from the content type
try:
response = requests.head(url)
content_type = response.headers.get('Content-Type', '')
if 'audio/wav' in content_type:
file_ext = '.wav'
elif 'audio/mpeg' in content_type:
file_ext = '.mp3'
elif 'audio/ogg' in content_type:
file_ext = '.ogg'
except Exception:
pass
filename = f"{uuid.uuid4()}{file_ext}"
local_path = os.path.join(cache_dir, filename)
# Download the file
response = requests.get(url, stream=True)
response.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logger.debug(f"Downloaded file from {url} to {local_path}")
return local_path
[docs]
class AudioAnalysisToolkit(BaseToolkit):
r"""A toolkit for audio processing and analysis.
This class provides methods for processing, transcribing, and extracting
information from audio data, including direct question answering about
audio content.
Args:
cache_dir (Optional[str]): Directory path for caching downloaded audio
files. If not provided, 'tmp/' will be used. (default: :obj:`None`)
transcribe_model (Optional[BaseAudioModel]): Model used for audio
transcription. If not provided, OpenAIAudioModels will be used.
(default: :obj:`None`)
audio_reasoning_model (Optional[BaseModelBackend]): Model used for
audio reasoning and question answering. If not provided, uses the
default model from ChatAgent. (default: :obj:`None`)
"""
def __init__(
self,
cache_dir: Optional[str] = None,
transcribe_model: Optional[BaseAudioModel] = None,
audio_reasoning_model: Optional[BaseModelBackend] = None,
):
self.cache_dir = 'tmp/'
if cache_dir:
self.cache_dir = cache_dir
if transcribe_model:
self.transcribe_model = transcribe_model
else:
self.transcribe_model = OpenAIAudioModels()
logger.warning(
"No audio transcription model provided. "
"Using OpenAIAudioModels."
)
from camel.agents import ChatAgent
if audio_reasoning_model:
self.audio_agent = ChatAgent(model=audio_reasoning_model)
else:
self.audio_agent = ChatAgent()
logger.warning(
"No audio reasoning model provided. Using default model in"
" ChatAgent."
)
[docs]
def audio2text(self, audio_path: str) -> str:
r"""Transcribe audio to text.
Args:
audio_path (str): The path to the audio file or URL.
Returns:
str: The transcribed text.
"""
logger.debug(
f"Calling transcribe_audio method for audio file `{audio_path}`."
)
try:
audio_transcript = self.transcribe_model.speech_to_text(audio_path)
if not audio_transcript:
logger.warning("Audio transcription returned empty result")
return "No audio transcription available."
return audio_transcript
except Exception as e:
logger.error(f"Audio transcription failed: {e}")
return "Audio transcription failed."
[docs]
def ask_question_about_audio(self, audio_path: str, question: str) -> str:
r"""Ask any question about the audio and get the answer using
multimodal model.
Args:
audio_path (str): The path to the audio file.
question (str): The question to ask about the audio.
Returns:
str: The answer to the question.
"""
logger.debug(
f"Calling ask_question_about_audio method for audio file \
`{audio_path}` and question `{question}`."
)
parsed_url = urlparse(audio_path)
is_url = all([parsed_url.scheme, parsed_url.netloc])
local_audio_path = audio_path
# If the audio is a URL, download it first
if is_url:
try:
local_audio_path = download_file(audio_path, self.cache_dir)
except Exception as e:
logger.error(f"Failed to download audio file: {e}")
return f"Failed to download audio file: {e!s}"
# Try direct audio question answering first
try:
# Check if the transcribe_model supports audio_question_answering
if hasattr(self.transcribe_model, 'audio_question_answering'):
logger.debug("Using direct audio question answering")
response = self.transcribe_model.audio_question_answering(
local_audio_path, question
)
return response
except Exception as e:
logger.warning(
f"Direct audio question answering failed: {e}. "
"Falling back to transcription-based approach."
)
# Fallback to transcription-based approach
try:
transcript = self.audio2text(local_audio_path)
reasoning_prompt = f"""
<speech_transcription_result>{transcript}</
speech_transcription_result>
Please answer the following question based on the speech
transcription result above:
<question>{question}</question>
"""
msg = BaseMessage.make_user_message(
role_name="User", content=reasoning_prompt
)
response = self.audio_agent.step(msg)
if not response or not response.msgs:
logger.error("Model returned empty response")
return (
"Failed to generate an answer. "
"The model returned an empty response."
)
answer = response.msgs[0].content
return answer
except Exception as e:
logger.error(f"Audio question answering failed: {e}")
return f"Failed to answer question about audio: {e!s}"
[docs]
def get_tools(self) -> List[FunctionTool]:
r"""Returns a list of FunctionTool objects representing the functions
in the toolkit.
Returns:
List[FunctionTool]: A list of FunctionTool objects representing the
functions in the toolkit.
"""
return [
FunctionTool(self.ask_question_about_audio),
FunctionTool(self.audio2text),
]