Source code for camel.runtime.remote_http_runtime

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import atexit
import json
import logging
import subprocess
import time
from functools import wraps
from pathlib import Path
from subprocess import Popen
from typing import Any, Dict, List, Optional, Union

import requests
from pydantic import BaseModel

from camel.runtime import BaseRuntime
from camel.toolkits.function_tool import FunctionTool

logger = logging.getLogger(__name__)


[docs] class RemoteHttpRuntime(BaseRuntime): r"""A runtime that runs functions in a remote HTTP server. You need to run the API server in the remote server first. Args: host (str): The host of the remote server. port (int): The port of the remote server. (default: :obj: `8000`) python_exec (str): The python executable to run the API server. (default: :obj: `python3`) """ def __init__( self, host: str, port: int = 8000, python_exec: str = "python3" ): super().__init__() self.host = host self.port = port self.python_exec = python_exec self.api_path = Path(__file__).parent / "api.py" self.entrypoint: Dict[str, str] = dict() self.process: Optional[Popen] = None
[docs] def build(self) -> "RemoteHttpRuntime": r"""Build the API server. Returns: RemoteHttpRuntime: The current runtime. """ self.process = subprocess.Popen( [ self.python_exec, str(self.api_path), *list(self.entrypoint.values()), ] ) atexit.register(self._cleanup) return self
def _cleanup(self): r"""Clean up the API server when exiting.""" if self.process and self.process.poll() is None: self.process.terminate() self.process.wait() self.process = None
[docs] def add( # type: ignore[override] self, funcs: Union[FunctionTool, List[FunctionTool]], entrypoint: str, redirect_stdout: bool = False, arguments: Optional[Dict[str, Any]] = None, ) -> "RemoteHttpRuntime": r"""Add a function or list of functions to the runtime. Args: funcs (Union[FunctionTool, List[FunctionTool]]): The function or list of functions to add. entrypoint (str): The entrypoint for the function. redirect_stdout (bool): Whether to return the stdout of the function. (default: :obj: `False`) arguments (Optional[Dict[str, Any]]): The arguments for the function. (default: :obj: `None`) Returns: RemoteHttpRuntime: The current runtime. """ if not isinstance(funcs, list): funcs = [funcs] if arguments is not None: entrypoint += json.dumps(arguments, ensure_ascii=False) for func in funcs: inner_func = func.func # Create a wrapper that explicitly binds `func` @wraps(inner_func) def wrapper( *args, func=func, redirect_stdout=redirect_stdout, **kwargs ): for key, value in kwargs.items(): if isinstance(value, BaseModel): kwargs[key] = value.model_dump() resp = requests.post( f"http://{self.host}:{self.port}/{func.get_function_name()}", json=dict( args=args, kwargs=kwargs, redirect_stdout=redirect_stdout, ), ) if resp.status_code != 200: logger.error( f"""ailed to execute function: {func.get_function_name()}, status code: {resp.status_code}, response: {resp.text}""" ) return { "error": f"""Failed to execute function: {func.get_function_name()}, response: {resp.text}""" } data = resp.json() if redirect_stdout: print(data["stdout"]) return json.loads(data["output"]) func.func = wrapper self.tools_map[func.get_function_name()] = func self.entrypoint[func.get_function_name()] = entrypoint return self
@property def ok(self) -> bool: r"""Check if the API Server is running. Returns: bool: Whether the API Server is running. """ try: _ = requests.get(f"http://{self.host}:{self.port}") return True except requests.exceptions.ConnectionError: return False
[docs] def wait(self, timeout: int = 10) -> bool: r"""Wait for the API Server to be ready. Args: timeout (int): The number of seconds to wait. (default: :obj: `10`) Returns: bool: Whether the API Server is ready. """ for _ in range(timeout): if self.ok: return True time.sleep(1) return False
def __del__(self): r"""Clean up the API server when the object is deleted.""" self._cleanup()
[docs] def stop(self) -> "RemoteHttpRuntime": r"""Stop the API server. Returns: RemoteHttpRuntime: The current runtime. """ self._cleanup() return self
[docs] def reset(self) -> "RemoteHttpRuntime": r"""Reset the API server. Returns: RemoteHttpRuntime: The current runtime. """ return self.stop().build()
@property def docs(self) -> str: r"""Get the URL for the API documentation. Returns: str: The URL for the API documentation. """ return f"http://{self.host}:{self.port}/docs"