Source code for camel.toolkits.video_analysis_toolkit

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import io
import os
import tempfile
from pathlib import Path
from typing import List, Optional

from PIL import Image

from camel.logger import get_logger
from camel.messages import BaseMessage
from camel.models import BaseModelBackend, OpenAIAudioModels
from camel.toolkits.base import BaseToolkit
from camel.toolkits.function_tool import FunctionTool
from camel.utils import MCPServer, dependencies_required

from .video_download_toolkit import (
    VideoDownloaderToolkit,
    _capture_screenshot,
)

logger = get_logger(__name__)

VIDEO_QA_PROMPT = """
Analyze the provided video frames and corresponding audio transcription to \
answer the given question(s) thoroughly and accurately.

Instructions:
    1. Visual Analysis:
        - Examine the video frames to identify visible entities.
        - Differentiate objects, species, or features based on key attributes \
such as size, color, shape, texture, or behavior.
        - Note significant groupings, interactions, or contextual patterns \
relevant to the analysis.

    2. Audio Integration:
        - Use the audio transcription to complement or clarify your visual \
observations.
        - Identify names, descriptions, or contextual hints in the \
transcription that help confirm or refine your visual analysis.

    3. Detailed Reasoning and Justification:
        - Provide a brief explanation of how you identified and distinguished \
each species or object.
        - Highlight specific features or contextual clues that informed \
your reasoning.

    4. Comprehensive Answer:
        - Specify the total number of distinct species or object types \
identified in the video.
        - Describe the defining characteristics and any supporting evidence \
from the video and transcription.

    5. Important Considerations:
        - Pay close attention to subtle differences that could distinguish \
similar-looking species or objects 
          (e.g., juveniles vs. adults, closely related species).
        - Provide concise yet complete explanations to ensure clarity.

**Audio Transcription:**
{audio_transcription}

**Question:**
{question}
"""


[docs] @MCPServer() class VideoAnalysisToolkit(BaseToolkit): r"""A class for analysing videos with vision-language model. Args: download_directory (Optional[str], optional): The directory where the video will be downloaded to. If not provided, video will be stored in a temporary directory and will be cleaned up after use. (default: :obj:`None`) model (Optional[BaseModelBackend], optional): The model to use for visual analysis. (default: :obj:`None`) use_audio_transcription (bool, optional): Whether to enable audio transcription using OpenAI's audio models. Requires a valid OpenAI API key. When disabled, video analysis will be based solely on visual content. (default: :obj:`False`) frame_interval (float, optional): Interval in seconds between frames to extract from the video. (default: :obj:`4.0`) output_language (str, optional): The language for output responses. (default: :obj:`"English"`) cookies_path (Optional[str]): The path to the cookies file for the video service in Netscape format. (default: :obj:`None`) timeout (Optional[float]): The timeout value for API requests in seconds. If None, no timeout is applied. (default: :obj:`None`) """ @dependencies_required("ffmpeg", "scenedetect") def __init__( self, download_directory: Optional[str] = None, model: Optional[BaseModelBackend] = None, use_audio_transcription: bool = False, frame_interval: float = 4.0, output_language: str = "English", cookies_path: Optional[str] = None, timeout: Optional[float] = None, ) -> None: super().__init__(timeout=timeout) self._cleanup = download_directory is None self._temp_files: list[str] = [] # Track temporary files for cleanup self._use_audio_transcription = use_audio_transcription self.output_language = output_language self.frame_interval = frame_interval self._download_directory = Path( download_directory or tempfile.mkdtemp() ).resolve() self.video_downloader_toolkit = VideoDownloaderToolkit( download_directory=str(self._download_directory), cookies_path=cookies_path, ) try: self._download_directory.mkdir(parents=True, exist_ok=True) except OSError as e: raise ValueError( f"Error creating directory {self._download_directory}: {e}" ) logger.info(f"Video will be downloaded to {self._download_directory}") self.vl_model = model # Ensure ChatAgent is initialized with a model if provided if self.vl_model: # Import ChatAgent at runtime to avoid circular imports from camel.agents import ChatAgent self.vl_agent = ChatAgent( model=self.vl_model, output_language=self.output_language ) else: # If no model is provided, use default model in ChatAgent # Import ChatAgent at runtime to avoid circular imports from camel.agents import ChatAgent self.vl_agent = ChatAgent(output_language=self.output_language) logger.warning( "No vision-language model provided. Using default model in " "ChatAgent." ) # Initialize audio models only if audio transcription is enabled self.audio_models = None if self._use_audio_transcription: try: self.audio_models = OpenAIAudioModels() except Exception as e: logger.warning( f"Failed to initialize OpenAIAudioModels: {e}. " "Audio transcription will be disabled." ) self._use_audio_transcription = False def __del__(self): r"""Clean up temporary directories and files when the object is destroyed. """ # Clean up temporary files for temp_file in self._temp_files: if os.path.exists(temp_file): try: os.remove(temp_file) logger.debug(f"Removed temporary file: {temp_file}") except OSError as e: logger.warning( f"Failed to remove temporary file {temp_file}: {e}" ) # Clean up temporary directory if needed if self._cleanup and os.path.exists(self._download_directory): try: import sys if getattr(sys, 'modules', None) is not None: import shutil shutil.rmtree(self._download_directory) logger.debug( f"Removed temp directory: {self._download_directory}" ) except (ImportError, AttributeError): # Skip cleanup if interpreter is shutting down pass except OSError as e: logger.warning( f"Failed to remove temporary directory " f"{self._download_directory}: {e}" ) def _extract_audio_from_video( self, video_path: str, output_format: str = "mp3" ) -> str: r"""Extract audio from the video. Args: video_path (str): The path to the video file. output_format (str): The format of the audio file to be saved. (default: :obj:`"mp3"`) Returns: str: The path to the audio file. """ import ffmpeg # Handle case where video file doesn't have an extension base_path = os.path.splitext(video_path)[0] output_path = f"{base_path}.{output_format}" try: ( ffmpeg.input(video_path) .output(output_path, vn=None, acodec="libmp3lame") .run(quiet=True) ) # Track the audio file for cleanup self._temp_files.append(output_path) return output_path except ffmpeg.Error as e: error_message = f"FFmpeg-Python failed: {e}" logger.error(error_message) raise RuntimeError(error_message) def _transcribe_audio(self, audio_path: str) -> str: r"""Transcribe the audio of the video.""" # Check if audio transcription is enabled and audio models are # available if not self._use_audio_transcription or self.audio_models is None: logger.warning("Audio transcription is disabled or not available") return "No audio transcription available." try: audio_transcript = self.audio_models.speech_to_text(audio_path) if not audio_transcript: logger.warning("Audio transcription returned empty result") return "No audio transcription available." return audio_transcript except Exception as e: logger.error(f"Audio transcription failed: {e}") return "Audio transcription failed." def _extract_keyframes(self, video_path: str) -> List[Image.Image]: r"""Extract keyframes from a video based on scene changes and regular intervals,and return them as PIL.Image.Image objects. Args: video_path (str): Path to the video file. Returns: List[Image.Image]: A list of PIL.Image.Image objects representing the extracted keyframes. Raises: ValueError: If no frames could be extracted from the video. """ import cv2 import numpy as np from scenedetect import ( # type: ignore[import-untyped] SceneManager, open_video, ) from scenedetect.detectors import ( # type: ignore[import-untyped] ContentDetector, ) # Get video information cap = cv2.VideoCapture(video_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) duration = total_frames / fps if fps > 0 else 0 cap.release() frame_interval = self.frame_interval # seconds # Maximum number of frames to extract to avoid memory issues MAX_FRAMES = 100 # Minimum time difference (in seconds) to consider frames as distinct TIME_THRESHOLD = 1.0 # Calculate the total number of frames to extract if duration <= 0 or fps <= 0: logger.warning( "Invalid video duration or fps, using default frame count" ) num_frames = 10 else: num_frames = max(int(duration / frame_interval), 1) if num_frames > MAX_FRAMES: frame_interval = duration / MAX_FRAMES num_frames = MAX_FRAMES logger.info( f"Video duration: {duration:.2f}s, target frames: {num_frames}" f"at {frame_interval:.2f}s intervals" ) # Use scene detection to extract keyframes # Use open_video instead of VideoManager video = open_video(video_path) scene_manager = SceneManager() scene_manager.add_detector(ContentDetector()) # Detect scenes using the modern API scene_manager.detect_scenes(video) scenes = scene_manager.get_scene_list() keyframes: List[Image.Image] = [] # If scene detection is successful, prioritize scene change points if scenes: logger.info(f"Detected {len(scenes)} scene changes") if len(scenes) > num_frames: scene_indices = np.linspace( 0, len(scenes) - 1, num_frames, dtype=int ) selected_scenes = [scenes[i] for i in scene_indices] else: selected_scenes = scenes # Extract frames from scenes for scene in selected_scenes: try: # Get start time in seconds start_time = scene[0].get_seconds() frame = _capture_screenshot(video_path, start_time) keyframes.append(frame) except Exception as e: logger.warning( f"Failed to capture frame at scene change" f" {scene[0].get_seconds()}s: {e}" ) if len(keyframes) < num_frames and duration > 0: logger.info( f"Scene detection provided {len(keyframes)} frames, " f"supplementing with regular interval frames" ) existing_times = [] if scenes: existing_times = [scene[0].get_seconds() for scene in scenes] regular_frames = [] for i in range(num_frames): time_sec = i * frame_interval is_duplicate = False for existing_time in existing_times: if abs(existing_time - time_sec) < TIME_THRESHOLD: is_duplicate = True break if not is_duplicate: try: frame = _capture_screenshot(video_path, time_sec) regular_frames.append(frame) except Exception as e: logger.warning( f"Failed to capture frame at {time_sec}s: {e}" ) frames_needed = num_frames - len(keyframes) if frames_needed > 0 and regular_frames: if len(regular_frames) > frames_needed: indices = np.linspace( 0, len(regular_frames) - 1, frames_needed, dtype=int ) selected_frames = [regular_frames[i] for i in indices] else: selected_frames = regular_frames keyframes.extend(selected_frames) if not keyframes: logger.warning( "No frames extracted, falling back to simple interval" "extraction" ) for i in range( min(num_frames, 10) ): # Limit to a maximum of 10 frames to avoid infinite loops time_sec = i * (duration / 10 if duration > 0 else 6.0) try: frame = _capture_screenshot(video_path, time_sec) keyframes.append(frame) except Exception as e: logger.warning( f"Failed to capture frame at {time_sec}s: {e}" ) if not keyframes: error_msg = ( f"Failed to extract any keyframes from video: {video_path}" ) logger.error(error_msg) raise ValueError(error_msg) # Normalize image sizes normalized_keyframes = self._normalize_frames(keyframes) logger.info( f"Extracted and normalized {len(normalized_keyframes)} keyframes" ) return normalized_keyframes def _normalize_frames( self, frames: List[Image.Image], target_width: int = 512 ) -> List[Image.Image]: r"""Normalize the size of extracted frames. Args: frames (List[Image.Image]): List of frames to normalize. target_width (int): Target width for normalized frames. Returns: List[Image.Image]: List of normalized frames. """ normalized_frames: List[Image.Image] = [] for frame in frames: # Get original dimensions width, height = frame.size # Calculate new height, maintaining aspect ratio aspect_ratio = width / height new_height = int(target_width / aspect_ratio) # Resize image resized_frame = frame.resize( (target_width, new_height), Image.Resampling.LANCZOS ) # Ensure the image has a proper format if resized_frame.mode != 'RGB': resized_frame = resized_frame.convert('RGB') # Create a new image with explicit format with io.BytesIO() as buffer: resized_frame.save(buffer, format='JPEG') buffer.seek(0) formatted_frame = Image.open(buffer) formatted_frame.load() # Load the image data normalized_frames.append(formatted_frame) return normalized_frames
[docs] def ask_question_about_video( self, video_path: str, question: str, ) -> str: r"""Ask a question about the video. Args: video_path (str): The path to the video file. It can be a local file or a URL (such as Youtube website). question (str): The question to ask about the video. Returns: str: The answer to the question. """ from urllib.parse import urlparse parsed_url = urlparse(video_path) is_url = all([parsed_url.scheme, parsed_url.netloc]) downloaded_video_path = None try: if is_url: downloaded_video_path = ( self.video_downloader_toolkit.download_video(video_path) ) if not downloaded_video_path or not os.path.exists( downloaded_video_path ): raise ValueError( f"Failed to download video from {video_path}" ) video_path = downloaded_video_path if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") audio_transcript = "No audio transcription available." if self._use_audio_transcription: audio_path = self._extract_audio_from_video(video_path) audio_transcript = self._transcribe_audio(audio_path) video_frames = self._extract_keyframes(video_path) prompt = VIDEO_QA_PROMPT.format( audio_transcription=audio_transcript, question=question, ) msg = BaseMessage.make_user_message( role_name="User", content=prompt, image_list=video_frames, ) # Reset the agent to clear previous state self.vl_agent.reset() response = self.vl_agent.step(msg) if not response or not response.msgs: logger.error("Model returned empty response") return ( "Failed to generate an answer. " "The model returned an empty response." ) answer = response.msgs[0].content return answer except Exception as e: error_message = f"Error processing video: {e}" logger.error(error_message) return f"Error: {error_message}"
[docs] def get_tools(self) -> List[FunctionTool]: r"""Returns a list of FunctionTool objects representing the functions in the toolkit. Returns: List[FunctionTool]: A list of FunctionTool objects representing the functions in the toolkit. """ return [FunctionTool(self.ask_question_about_video)]