Source code for camel.models.vllm_model

# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
import os
import subprocess
from typing import Any, Dict, List, Optional, Union

from openai import OpenAI, Stream

from camel.configs import VLLM_API_PARAMS
from camel.messages import OpenAIMessage
from camel.types import ChatCompletion, ChatCompletionChunk, ModelType
from camel.utils import BaseTokenCounter, OpenAITokenCounter


# flake8: noqa: E501
[docs] class VLLMModel: r"""vLLM service interface.""" def __init__( self, model_type: str, model_config_dict: Dict[str, Any], url: Optional[str] = None, api_key: Optional[str] = None, token_counter: Optional[BaseTokenCounter] = None, ) -> None: r"""Constructor for vLLM backend with OpenAI compatibility. # Reference: https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html Args: model_type (str): Model for which a backend is created. model_config_dict (Dict[str, Any]): A dictionary that will be fed into openai.ChatCompletion.create(). url (Optional[str]): The url to the model service. (default: :obj:`"http://localhost:8000/v1"`) api_key (Optional[str]): The API key for authenticating with the model service. token_counter (Optional[BaseTokenCounter]): Token counter to use for the model. If not provided, `OpenAITokenCounter(ModelType. GPT_4O_MINI)` will be used. """ self.model_type = model_type self.model_config_dict = model_config_dict self._url = ( url or os.environ.get("VLLM_BASE_URL") or "http://localhost:8000/v1" ) if not url and not os.environ.get("VLLM_BASE_URL"): self._start_server() # Use OpenAI cilent as interface call vLLM self._client = OpenAI( timeout=60, max_retries=3, base_url=self._url, api_key=api_key, ) self._token_counter = token_counter self.check_model_config() def _start_server(self) -> None: r"""Starts the vllm server in a subprocess.""" try: subprocess.Popen( ["vllm", "server", "--port", "8000"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) print( f"vllm server started on http://localhost:8000/v1 " f"for {self.model_type} model." ) except Exception as e: print(f"Failed to start vllm server: {e}.") @property def token_counter(self) -> BaseTokenCounter: r"""Initialize the token counter for the model backend. Returns: BaseTokenCounter: The token counter following the model's tokenization style. """ if not self._token_counter: self._token_counter = OpenAITokenCounter(ModelType.GPT_4O_MINI) return self._token_counter
[docs] def check_model_config(self): r"""Check whether the model configuration contains any unexpected arguments to vLLM API. Raises: ValueError: If the model configuration dictionary contains any unexpected arguments to OpenAI API. """ for param in self.model_config_dict: if param not in VLLM_API_PARAMS: raise ValueError( f"Unexpected argument `{param}` is " "input into vLLM model backend." )
[docs] def run( self, messages: List[OpenAIMessage], ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: r"""Runs inference of OpenAI chat completion. Args: messages (List[OpenAIMessage]): Message list with the chat history in OpenAI API format. Returns: Union[ChatCompletion, Stream[ChatCompletionChunk]]: `ChatCompletion` in the non-stream mode, or `Stream[ChatCompletionChunk]` in the stream mode. """ response = self._client.chat.completions.create( messages=messages, model=self.model_type, **self.model_config_dict, ) return response
@property def token_limit(self) -> int: r"""Returns the maximum token limit for the given model. Returns: int: The maximum token limit for the given model. """ max_tokens = self.model_config_dict.get("max_tokens") if isinstance(max_tokens, int): return max_tokens print( "Must set `max_tokens` as an integer in `model_config_dict` when" " setting up the model. Using 4096 as default value." ) return 4096 @property def stream(self) -> bool: r"""Returns whether the model is in stream mode, which sends partial results each time. Returns: bool: Whether the model is in stream mode. """ return self.model_config_dict.get('stream', False)