Source code for camel.runtime.docker_runtime

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import io
import json
import logging
import os
import tarfile
import time
from functools import wraps
from pathlib import Path
from random import randint
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

import requests
from pydantic import BaseModel
from tqdm import tqdm

from camel.runtime import BaseRuntime, TaskConfig
from camel.toolkits import FunctionTool

if TYPE_CHECKING:
    from docker.models.containers import Container

logger = logging.getLogger(__name__)


[docs] class DockerRuntime(BaseRuntime): r"""A class representing a runtime environment using Docker. This class automatically wraps functions to be executed in a Docker container. Args: image (str): The name of the Docker image to use for the runtime. port (int): The port number to use for the runtime API. (default: :obj: `8000`) remove (bool): Whether to remove the container after stopping it. ' (default: :obj: `True`) kwargs (dict): Additional keyword arguments to pass to the Docker client. """ def __init__( self, image: str, port: int = 8000, remove: bool = True, **kwargs ): super().__init__() import docker self.client = docker.from_env() self.container: Optional[Container] = None api_path = Path(__file__).parent / "api.py" self.mounts: Dict[Path, Path] = dict() self.cp: Dict[Path, Path] = {api_path: Path("/home")} self.entrypoint: Dict[str, str] = dict() self.tasks: List[TaskConfig] = [] self.docker_config = kwargs self.image = image self.port = port if port > 0 else randint(10000, 20000) self.remove = remove if not self.client.images.list(name=self.image): logger.warning( f"Image {self.image} not found. Pulling from Docker Hub." ) self.client.images.pull(self.image)
[docs] def mount(self, path: str, mount_path: str) -> "DockerRuntime": r"""Mount a local directory to the container. Args: path (str): The local path to mount. mount_path (str): The path to mount the local directory to in the container. Returns: DockerRuntime: The DockerRuntime instance. """ _path, _mount_path = Path(path), Path(mount_path) if not _path.exists(): raise FileNotFoundError(f"Path {_path} does not exist.") if not _path.is_dir(): raise NotADirectoryError(f"Path {_path} is not a directory.") if not _path.is_absolute(): raise ValueError(f"Path {_path} is not absolute.") if not _mount_path.is_absolute(): raise ValueError(f"Mount path {_mount_path} is not absolute.") self.mounts[_path] = _mount_path return self
[docs] def copy(self, source: str, dest: str) -> "DockerRuntime": r"""Copy a file or directory to the container. Args: source (str): The local path to the file. dest (str): The path to copy the file to in the container. Returns: DockerRuntime: The DockerRuntime instance. """ _source, _dest = Path(source), Path(dest) if not _source.exists(): raise FileNotFoundError(f"Source {_source} does not exist.") self.cp[_source] = _dest return self
[docs] def add_task( self, task: TaskConfig, ) -> "DockerRuntime": r"""Add a task to run a command inside the container when building. Similar to `docker exec`. Args: task (TaskConfig): The configuration for the task. Returns: DockerRuntime: The DockerRuntime instance. """ self.tasks.append(task) return self
[docs] def exec_run( self, task: TaskConfig, ) -> Any: r"""Run a command inside this container. Similar to `docker exec`. Args: task (TaskConfig): The configuration for the task. Returns: (ExecResult): A tuple of (exit_code, output) exit_code: (int): Exit code for the executed command or `None` if either `stream` or `socket` is `True`. output: (generator, bytes, or tuple): If `stream=True`, a generator yielding response chunks. If `socket=True`, a socket object for the connection. If `demux=True`, a tuple of two bytes: stdout and stderr. A bytestring containing response data otherwise. Raises: RuntimeError: If the container does not exist. """ if not self.container: raise RuntimeError( "Container does not exist. Please build the container first." ) return self.container.exec_run(**task.model_dump())
[docs] def build(self, time_out: int = 15) -> "DockerRuntime": r"""Build the Docker container and start it. Args: time_out (int): The number of seconds to wait for the container to start. (default: :obj: `15`) Returns: DockerRuntime: The DockerRuntime instance. """ if self.container: logger.warning("Container already exists. Nothing to build.") return self import docker from docker.types import Mount mounts = [] for local_path, mount_path in self.mounts.items(): mounts.append( Mount( target=str(mount_path), source=str(local_path), type="bind" ) ) container_params = { "image": self.image, "detach": True, "mounts": mounts, "command": "sleep infinity", **self.docker_config, } container_params["ports"] = {"8000/tcp": self.port} try: self.container = self.client.containers.create(**container_params) except docker.errors.APIError as e: raise RuntimeError(f"Failed to create container: {e!s}") try: self.container.start() # Wait for the container to start for _ in range(time_out): self.container.reload() logger.debug(f"Container status: {self.container.status}") if self.container.status == "running": break time.sleep(1) except docker.errors.APIError as e: raise RuntimeError(f"Failed to start container: {e!s}") # Copy files to the container if specified for local_path, container_path in self.cp.items(): logger.info(f"Copying {local_path} to {container_path}") try: with io.BytesIO() as tar_stream: with tarfile.open(fileobj=tar_stream, mode="w") as tar: tar.add( local_path, arcname=os.path.basename(local_path) ) tar_stream.seek(0) self.container.put_archive( str(container_path), tar_stream.getvalue() ) except docker.errors.APIError as e: raise RuntimeError( f"Failed to copy file {local_path} to container: {e!s}" ) if self.tasks: for task in tqdm(self.tasks, desc="Running tasks"): self.exec_run(task) exec = ["python3", "api.py", *list(self.entrypoint.values())] self.container.exec_run(exec, workdir="/home", detach=True) logger.info(f"Container started on port {self.port}") return self
[docs] def add( # type: ignore[override] self, funcs: Union[FunctionTool, List[FunctionTool]], entrypoint: str, redirect_stdout: bool = False, arguments: Optional[Dict[str, Any]] = None, ) -> "DockerRuntime": r"""Add a function or list of functions to the runtime. Args: funcs (Union[FunctionTool, List[FunctionTool]]): The function or list of functions to add. entrypoint (str): The entrypoint for the function. redirect_stdout (bool): Whether to return the stdout of the function. (default: :obj: `False`) arguments (Optional[Dict[str, Any]]): The arguments for the function. (default: :obj: `None`) Returns: DockerRuntime: The DockerRuntime instance. """ if not isinstance(funcs, list): funcs = [funcs] if arguments is not None: entrypoint += json.dumps(arguments, ensure_ascii=False) for func in funcs: inner_func = func.func # Create a wrapper that explicitly binds `func` @wraps(inner_func) def wrapper( *args, func=func, redirect_stdout=redirect_stdout, **kwargs ): for key, value in kwargs.items(): if isinstance(value, BaseModel): kwargs[key] = value.model_dump() resp = requests.post( f"http://localhost:{self.port}/{func.get_function_name()}", json=dict( args=args, kwargs=kwargs, redirect_stdout=redirect_stdout, ), ) if resp.status_code != 200: logger.error( f"""ailed to execute function: {func.get_function_name()}, status code: {resp.status_code}, response: {resp.text}""" ) return { "error": f"""Failed to execute function: {func.get_function_name()}, response: {resp.text}""" } data = resp.json() if redirect_stdout: print(data["stdout"]) return json.loads(data["output"]) func.func = wrapper self.tools_map[func.get_function_name()] = func self.entrypoint[func.get_function_name()] = entrypoint return self
[docs] def reset(self) -> "DockerRuntime": r"""Reset the DockerRuntime instance. Returns: DockerRuntime: The DockerRuntime instance. """ return self.stop().build()
[docs] def stop(self, remove: Optional[bool] = None) -> "DockerRuntime": r"""stop the Docker container. Args: remove (Optional[bool]): Whether to remove the container after stopping it. (default: :obj: `None`) Returns: DockerRuntime: The DockerRuntime instance. """ if self.container: self.container.stop() if remove is None: remove = self.remove if remove: logger.info("Removing container.") self.container.remove() self.container = None else: logger.warning("No container to stop.") return self
@property def ok(self) -> bool: r"""Check if the API Server is running. Returns: bool: Whether the API Server is running. """ if not self.container: return False try: _ = requests.get(f"http://localhost:{self.port}") return True except requests.exceptions.ConnectionError: return False
[docs] def wait(self, timeout: int = 10) -> bool: r"""Wait for the API Server to be ready. Args: timeout (int): The number of seconds to wait. (default: :obj: `10`) Returns: bool: Whether the API Server is ready. """ for _ in range(timeout): if self.ok: return True time.sleep(1) return False
def __enter__(self) -> "DockerRuntime": r"""Enter the context manager. Returns: DockerRuntime: The DockerRuntime instance. """ if not self.container: return self.build() logger.warning( "Container already exists. Returning existing container." ) return self def __exit__(self, exc_type, exc_val, exc_tb): r"""Exit the context manager.""" self.stop() @property def docs(self) -> str: r"""Get the URL for the API documentation. Returns: str: The URL for the API documentation. """ return f"http://localhost:{self.port}/docs"