Source code for camel.models.openai_compatible_model

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import os
from typing import Any, Dict, List, Optional, Type, Union

from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream
from pydantic import BaseModel

from camel.messages import OpenAIMessage
from camel.models.base_model import BaseModelBackend
from camel.types import (
    ChatCompletion,
    ChatCompletionChunk,
    ModelType,
)
from camel.utils import (
    BaseTokenCounter,
    OpenAITokenCounter,
)


[docs] class OpenAICompatibleModel(BaseModelBackend): r"""Constructor for model backend supporting OpenAI compatibility. Args: model_type (Union[ModelType, str]): Model for which a backend is created. model_config_dict (Optional[Dict[str, Any]], optional): A dictionary that will be fed into:obj:`openai.ChatCompletion.create()`. If :obj:`None`, :obj:`{}` will be used. (default: :obj:`None`) api_key (str): The API key for authenticating with the model service. url (str): The url to the model service. token_counter (Optional[BaseTokenCounter], optional): Token counter to use for the model. If not provided, :obj:`OpenAITokenCounter( ModelType.GPT_4O_MINI)` will be used. (default: :obj:`None`) timeout (Optional[float], optional): The timeout value in seconds for API calls. If not provided, will fall back to the MODEL_TIMEOUT environment variable or default to 180 seconds. (default: :obj:`None`) """ def __init__( self, model_type: Union[ModelType, str], model_config_dict: Optional[Dict[str, Any]] = None, api_key: Optional[str] = None, url: Optional[str] = None, token_counter: Optional[BaseTokenCounter] = None, timeout: Optional[float] = None, ) -> None: api_key = api_key or os.environ.get("OPENAI_COMPATIBILITY_API_KEY") url = url or os.environ.get("OPENAI_COMPATIBILITY_API_BASE_URL") timeout = timeout or float(os.environ.get("MODEL_TIMEOUT", 180)) super().__init__( model_type, model_config_dict, api_key, url, token_counter, timeout ) self._client = OpenAI( timeout=self._timeout, max_retries=3, api_key=self._api_key, base_url=self._url, ) self._async_client = AsyncOpenAI( timeout=self._timeout, max_retries=3, api_key=self._api_key, base_url=self._url, ) def _run( self, messages: List[OpenAIMessage], response_format: Optional[Type[BaseModel]] = None, tools: Optional[List[Dict[str, Any]]] = None, ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: r"""Runs inference of OpenAI chat completion. Args: messages (List[OpenAIMessage]): Message list with the chat history in OpenAI API format. response_format (Optional[Type[BaseModel]]): The format of the response. tools (Optional[List[Dict[str, Any]]]): The schema of the tools to use for the request. Returns: Union[ChatCompletion, Stream[ChatCompletionChunk]]: `ChatCompletion` in the non-stream mode, or `Stream[ChatCompletionChunk]` in the stream mode. """ response_format = response_format or self.model_config_dict.get( "response_format", None ) if response_format: return self._request_parse(messages, response_format, tools) else: return self._request_chat_completion(messages, tools) async def _arun( self, messages: List[OpenAIMessage], response_format: Optional[Type[BaseModel]] = None, tools: Optional[List[Dict[str, Any]]] = None, ) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]: r"""Runs inference of OpenAI chat completion in async mode. Args: messages (List[OpenAIMessage]): Message list with the chat history in OpenAI API format. response_format (Optional[Type[BaseModel]]): The format of the response. tools (Optional[List[Dict[str, Any]]]): The schema of the tools to use for the request. Returns: Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]: `ChatCompletion` in the non-stream mode, or `AsyncStream[ChatCompletionChunk]` in the stream mode. """ response_format = response_format or self.model_config_dict.get( "response_format", None ) if response_format: return await self._arequest_parse(messages, response_format, tools) else: return await self._arequest_chat_completion(messages, tools) def _request_chat_completion( self, messages: List[OpenAIMessage], tools: Optional[List[Dict[str, Any]]] = None, ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: request_config = self.model_config_dict.copy() if tools: request_config["tools"] = tools return self._client.chat.completions.create( messages=messages, model=self.model_type, **request_config, ) async def _arequest_chat_completion( self, messages: List[OpenAIMessage], tools: Optional[List[Dict[str, Any]]] = None, ) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]: request_config = self.model_config_dict.copy() if tools: request_config["tools"] = tools return await self._async_client.chat.completions.create( messages=messages, model=self.model_type, **request_config, ) def _request_parse( self, messages: List[OpenAIMessage], response_format: Type[BaseModel], tools: Optional[List[Dict[str, Any]]] = None, ) -> ChatCompletion: import copy request_config = copy.deepcopy(self.model_config_dict) # Remove stream from request_config since OpenAI does not support it # when structured response is used request_config["response_format"] = response_format request_config.pop("stream", None) if tools is not None: request_config["tools"] = tools return self._client.beta.chat.completions.parse( messages=messages, model=self.model_type, **request_config, ) async def _arequest_parse( self, messages: List[OpenAIMessage], response_format: Type[BaseModel], tools: Optional[List[Dict[str, Any]]] = None, ) -> ChatCompletion: import copy request_config = copy.deepcopy(self.model_config_dict) # Remove stream from request_config since OpenAI does not support it # when structured response is used request_config["response_format"] = response_format request_config.pop("stream", None) if tools is not None: request_config["tools"] = tools return await self._async_client.beta.chat.completions.parse( messages=messages, model=self.model_type, **request_config, ) @property def token_counter(self) -> BaseTokenCounter: r"""Initialize the token counter for the model backend. Returns: OpenAITokenCounter: The token counter following the model's tokenization style. """ if not self._token_counter: self._token_counter = OpenAITokenCounter(ModelType.GPT_4O_MINI) return self._token_counter @property def stream(self) -> bool: r"""Returns whether the model is in stream mode, which sends partial results each time. Returns: bool: Whether the model is in stream mode. """ return self.model_config_dict.get('stream', False)
[docs] def check_model_config(self): pass