Source code for camel.toolkits.video_toolkit
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import io
import logging
import re
import tempfile
from pathlib import Path
from typing import List, Optional
from PIL import Image
from camel.toolkits.base import BaseToolkit
from camel.toolkits.function_tool import FunctionTool
from camel.utils import dependencies_required
logger = logging.getLogger(__name__)
def _standardize_url(url: str) -> str:
r"""Standardize the given URL."""
# Special case for YouTube embed URLs
if "youtube.com/embed/" in url:
match = re.search(r"embed/([a-zA-Z0-9_-]+)", url)
if match:
return f"https://www.youtube.com/watch?v={match.group(1)}"
else:
raise ValueError(f"Invalid YouTube URL: {url}")
return url
def _capture_screenshot(video_file: str, timestamp: float) -> Image.Image:
r"""Capture a screenshot from a video file at a specific timestamp.
Args:
video_file (str): The path to the video file.
timestamp (float): The time in seconds from which to capture the
screenshot.
Returns:
Image.Image: The captured screenshot in the form of Image.Image.
"""
import ffmpeg
try:
out, _ = (
ffmpeg.input(video_file, ss=timestamp)
.filter('scale', 320, -1)
.output('pipe:', vframes=1, format='image2', vcodec='png')
.run(capture_stdout=True, capture_stderr=True)
)
except ffmpeg.Error as e:
raise RuntimeError(f"Failed to capture screenshot: {e.stderr}")
return Image.open(io.BytesIO(out))
[docs]
class VideoDownloaderToolkit(BaseToolkit):
r"""A class for downloading videos and optionally splitting them into
chunks.
Args:
download_directory (Optional[str], optional): The directory where the
video will be downloaded to. If not provided, video will be stored
in a temporary directory and will be cleaned up after use.
(default: :obj:`None`)
cookies_path (Optional[str], optional): The path to the cookies file
for the video service in Netscape format. (default: :obj:`None`)
"""
@dependencies_required("yt_dlp", "ffmpeg")
def __init__(
self,
download_directory: Optional[str] = None,
cookies_path: Optional[str] = None,
) -> None:
self._cleanup = download_directory is None
self._cookies_path = cookies_path
self._download_directory = Path(
download_directory or tempfile.mkdtemp()
).resolve()
try:
self._download_directory.mkdir(parents=True, exist_ok=True)
except FileExistsError:
raise ValueError(
f"{self._download_directory} is not a valid directory."
)
except OSError as e:
raise ValueError(
f"Error creating directory {self._download_directory}: {e}"
)
logger.info(f"Video will be downloaded to {self._download_directory}")
def __del__(self) -> None:
r"""Deconstructor for the VideoDownloaderToolkit class.
Cleans up the downloaded video if they are stored in a temporary
directory.
"""
import shutil
if self._cleanup:
shutil.rmtree(self._download_directory, ignore_errors=True)
def _download_video(self, url: str) -> str:
r"""Download the video and optionally split it into chunks.
yt-dlp will detect if the video is downloaded automatically so there
is no need to check if the video exists.
Returns:
str: The path to the downloaded video file.
"""
import yt_dlp
video_template = self._download_directory / "%(title)s.%(ext)s"
ydl_opts = {
'format': 'bestvideo+bestaudio/best',
'outtmpl': str(video_template),
'force_generic_extractor': True,
'cookiefile': self._cookies_path,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Download the video and get the filename
logger.info(f"Downloading video from {url}...")
info = ydl.extract_info(url, download=True)
return ydl.prepare_filename(info)
except yt_dlp.utils.DownloadError as e:
raise RuntimeError(f"Failed to download video from {url}: {e}")
[docs]
def get_video_bytes(
self,
video_url: str,
) -> bytes:
r"""Download video by the URL, and return the content in bytes.
Args:
video_url (str): The URL of the video to download.
Returns:
bytes: The video file content in bytes.
"""
url = _standardize_url(video_url)
video_file = self._download_video(url)
with open(video_file, 'rb') as f:
video_bytes = f.read()
return video_bytes
[docs]
def get_video_screenshots(
self, video_url: str, amount: int
) -> List[Image.Image]:
r"""Capture screenshots from the video at specified timestamps or by
dividing the video into equal parts if an integer is provided.
Args:
video_url (str): The URL of the video to take screenshots.
amount (int): the amount of evenly split screenshots to capture.
Returns:
List[Image.Image]: A list of screenshots as Image.Image.
"""
import ffmpeg
url = _standardize_url(video_url)
video_file = self._download_video(url)
# Get the video length
try:
probe = ffmpeg.probe(video_file)
video_length = float(probe['format']['duration'])
except ffmpeg.Error as e:
raise RuntimeError(f"Failed to determine video length: {e.stderr}")
interval = video_length / (amount + 1)
timestamps = [i * interval for i in range(1, amount + 1)]
images = [_capture_screenshot(video_file, ts) for ts in timestamps]
return images
[docs]
def get_tools(self) -> List[FunctionTool]:
r"""Returns a list of FunctionTool objects representing the
functions in the toolkit.
Returns:
List[FunctionTool]: A list of FunctionTool objects representing
the functions in the toolkit.
"""
return [
FunctionTool(self.get_video_bytes),
FunctionTool(self.get_video_screenshots),
]