Source code for camel.toolkits.video_analysis_toolkit

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import os
import tempfile
from pathlib import Path
from typing import List, Optional

from PIL import Image

from camel.logger import get_logger
from camel.messages import BaseMessage
from camel.models import BaseModelBackend, OpenAIAudioModels
from camel.toolkits.base import BaseToolkit
from camel.toolkits.function_tool import FunctionTool
from camel.utils import dependencies_required

from .video_download_toolkit import (
    VideoDownloaderToolkit,
    _capture_screenshot,
)

logger = get_logger(__name__)

VIDEO_QA_PROMPT = """
Analyze the provided video frames and corresponding audio transcription to \
answer the given question(s) thoroughly and accurately.

Instructions:
    1. Visual Analysis:
        - Examine the video frames to identify visible entities.
        - Differentiate objects, species, or features based on key attributes \
such as size, color, shape, texture, or behavior.
        - Note significant groupings, interactions, or contextual patterns \
relevant to the analysis.

    2. Audio Integration:
        - Use the audio transcription to complement or clarify your visual \
observations.
        - Identify names, descriptions, or contextual hints in the \
transcription that help confirm or refine your visual analysis.

    3. Detailed Reasoning and Justification:
        - Provide a brief explanation of how you identified and distinguished \
each species or object.
        - Highlight specific features or contextual clues that informed \
your reasoning.

    4. Comprehensive Answer:
        - Specify the total number of distinct species or object types \
identified in the video.
        - Describe the defining characteristics and any supporting evidence \
from the video and transcription.

    5. Important Considerations:
        - Pay close attention to subtle differences that could distinguish \
similar-looking species or objects 
          (e.g., juveniles vs. adults, closely related species).
        - Provide concise yet complete explanations to ensure clarity.

**Audio Transcription:**
{audio_transcription}

**Question:**
{question}
"""


[docs] class VideoAnalysisToolkit(BaseToolkit): r"""A class for analysing videos with vision-language model. Args: download_directory (Optional[str], optional): The directory where the video will be downloaded to. If not provided, video will be stored in a temporary directory and will be cleaned up after use. (default: :obj:`None`) model (Optional[BaseModelBackend], optional): The model to use for visual analysis. (default: :obj:`None`) use_audio_transcription (bool, optional): Whether to enable audio transcription using OpenAI's audio models. Requires a valid OpenAI API key. When disabled, video analysis will be based solely on visual content. (default: :obj:`False`) """ @dependencies_required("ffmpeg", "scenedetect") def __init__( self, download_directory: Optional[str] = None, model: Optional[BaseModelBackend] = None, use_audio_transcription: bool = False, ) -> None: self._cleanup = download_directory is None self._temp_files: list[str] = [] # Track temporary files for cleanup self._use_audio_transcription = use_audio_transcription self._download_directory = Path( download_directory or tempfile.mkdtemp() ).resolve() self.video_downloader_toolkit = VideoDownloaderToolkit( download_directory=str(self._download_directory) ) try: self._download_directory.mkdir(parents=True, exist_ok=True) except FileExistsError: raise ValueError( f"{self._download_directory} is not a valid directory." ) except OSError as e: raise ValueError( f"Error creating directory {self._download_directory}: {e}" ) logger.info(f"Video will be downloaded to {self._download_directory}") self.vl_model = model # Ensure ChatAgent is initialized with a model if provided if self.vl_model: # Import ChatAgent at runtime to avoid circular imports from camel.agents import ChatAgent self.vl_agent = ChatAgent(model=self.vl_model) else: # If no model is provided, use default model in ChatAgent # Import ChatAgent at runtime to avoid circular imports from camel.agents import ChatAgent self.vl_agent = ChatAgent() logger.warning( "No vision-language model provided. Using default model in" " ChatAgent." ) # Initialize audio models only if audio transcription is enabled self.audio_models = None if self._use_audio_transcription: try: self.audio_models = OpenAIAudioModels() except Exception as e: logger.warning( f"Failed to initialize OpenAIAudioModels: {e}. " "Audio transcription will be disabled." ) self._use_audio_transcription = False def __del__(self): r"""Clean up temporary directories and files when the object is destroyed. """ # Clean up temporary files for temp_file in self._temp_files: if os.path.exists(temp_file): try: os.remove(temp_file) logger.debug(f"Removed temporary file: {temp_file}") except OSError as e: logger.warning( f"Failed to remove temporary file {temp_file}: {e}" ) # Clean up temporary directory if needed if self._cleanup and os.path.exists(self._download_directory): try: import shutil shutil.rmtree(self._download_directory) logger.debug( f"Removed temporary directory: {self._download_directory}" ) except OSError as e: logger.warning( f"Failed to remove temporary directory" f" {self._download_directory}: {e}" ) def _extract_audio_from_video( self, video_path: str, output_format: str = "mp3" ) -> str: r"""Extract audio from the video. Args: video_path (str): The path to the video file. output_format (str): The format of the audio file to be saved. (default: :obj:`"mp3"`) Returns: str: The path to the audio file. """ import ffmpeg # Handle case where video file doesn't have an extension base_path = os.path.splitext(video_path)[0] output_path = f"{base_path}.{output_format}" try: ( ffmpeg.input(video_path) .output(output_path, vn=None, acodec="libmp3lame") .run(quiet=True) ) # Track the audio file for cleanup self._temp_files.append(output_path) return output_path except ffmpeg.Error as e: error_message = f"FFmpeg-Python failed: {e}" logger.error(error_message) raise RuntimeError(error_message) def _transcribe_audio(self, audio_path: str) -> str: r"""Transcribe the audio of the video.""" # Check if audio transcription is enabled and audio models are # available if not self._use_audio_transcription or self.audio_models is None: logger.warning("Audio transcription is disabled or not available") return "No audio transcription available." try: audio_transcript = self.audio_models.speech_to_text(audio_path) if not audio_transcript: logger.warning("Audio transcription returned empty result") return "No audio transcription available." return audio_transcript except Exception as e: logger.error(f"Audio transcription failed: {e}") return "Audio transcription failed." def _extract_keyframes( self, video_path: str, num_frames: int, threshold: float = 25.0 ) -> List[Image.Image]: r"""Extract keyframes from a video based on scene changes and return them as PIL.Image.Image objects. Args: video_path (str): Path to the video file. num_frames (int): Number of keyframes to extract. threshold (float): The threshold value for scene change detection. Returns: list: A list of PIL.Image.Image objects representing the extracted keyframes. """ from scenedetect import ( # type: ignore[import-untyped] SceneManager, VideoManager, ) from scenedetect.detectors import ( # type: ignore[import-untyped] ContentDetector, ) if num_frames <= 0: logger.warning( f"Invalid num_frames: {num_frames}, using default of 1" ) num_frames = 1 video_manager = VideoManager([video_path]) scene_manager = SceneManager() scene_manager.add_detector(ContentDetector(threshold=threshold)) video_manager.set_duration() video_manager.start() scene_manager.detect_scenes(video_manager) scenes = scene_manager.get_scene_list() keyframes: List[Image.Image] = [] # Handle case where no scenes are detected if not scenes: logger.warning( "No scenes detected in video, capturing frames at " "regular intervals" ) import cv2 cap = cv2.VideoCapture(video_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) duration = total_frames / fps if fps > 0 else 0 if duration > 0 and total_frames > 0: # Extract frames at regular intervals interval = duration / min(num_frames, total_frames) for i in range(min(num_frames, total_frames)): time_sec = i * interval frame = _capture_screenshot(video_path, time_sec) keyframes.append(frame) cap.release() else: # Extract frames from detected scenes for start_time, _ in scenes: if len(keyframes) >= num_frames: break frame = _capture_screenshot(video_path, start_time) keyframes.append(frame) if not keyframes: logger.error("Failed to extract any keyframes from video") raise ValueError("Failed to extract keyframes from video") logger.info(f"Extracted {len(keyframes)} keyframes") return keyframes
[docs] def ask_question_about_video( self, video_path: str, question: str, num_frames: int = 28, ) -> str: r"""Ask a question about the video. Args: video_path (str): The path to the video file. It can be a local file or a URL (such as Youtube website). question (str): The question to ask about the video. num_frames (int): The number of frames to extract from the video. To be adjusted based on the length of the video. (default: :obj:`28`) Returns: str: The answer to the question. """ from urllib.parse import urlparse if not question: raise ValueError("Question cannot be empty") if num_frames <= 0: logger.warning( f"Invalid num_frames: {num_frames}, using default of 28" ) num_frames = 28 parsed_url = urlparse(video_path) is_url = all([parsed_url.scheme, parsed_url.netloc]) downloaded_video_path = None try: if is_url: downloaded_video_path = ( self.video_downloader_toolkit.download_video(video_path) ) if not downloaded_video_path or not os.path.exists( downloaded_video_path ): raise ValueError( f"Failed to download video from {video_path}" ) video_path = downloaded_video_path if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") audio_transcript = "No audio transcription available." if self._use_audio_transcription: audio_path = self._extract_audio_from_video(video_path) audio_transcript = self._transcribe_audio(audio_path) video_frames = self._extract_keyframes(video_path, num_frames) prompt = VIDEO_QA_PROMPT.format( audio_transcription=audio_transcript, question=question, ) msg = BaseMessage.make_user_message( role_name="User", content=prompt, image_list=video_frames, ) response = self.vl_agent.step(msg) if not response or not response.msgs: logger.error("Model returned empty response") return ( "Failed to generate an answer. " "The model returned an empty response." ) answer = response.msgs[0].content return answer except Exception as e: error_message = f"Error processing video: {e!s}" logger.error(error_message) return f"Error: {error_message}"
[docs] def get_tools(self) -> List[FunctionTool]: r"""Returns a list of FunctionTool objects representing the functions in the toolkit. Returns: List[FunctionTool]: A list of FunctionTool objects representing the functions in the toolkit. """ return [FunctionTool(self.ask_question_about_video)]