Source code for camel.models.openai_model

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import os
import warnings
from typing import Any, Dict, List, Optional, Type, Union

from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream
from pydantic import BaseModel

from camel.configs import OPENAI_API_PARAMS, ChatGPTConfig
from camel.messages import OpenAIMessage
from camel.models import BaseModelBackend
from camel.types import (
    ChatCompletion,
    ChatCompletionChunk,
    ModelType,
)
from camel.utils import (
    BaseTokenCounter,
    OpenAITokenCounter,
    api_keys_required,
)

UNSUPPORTED_PARAMS = {
    "temperature",
    "top_p",
    "presence_penalty",
    "frequency_penalty",
    "logprobs",
    "top_logprobs",
    "logit_bias",
}


[docs] class OpenAIModel(BaseModelBackend): r"""OpenAI API in a unified BaseModelBackend interface. Args: model_type (Union[ModelType, str]): Model for which a backend is created, one of GPT_* series. model_config_dict (Optional[Dict[str, Any]], optional): A dictionary that will be fed into:obj:`openai.ChatCompletion.create()`. If :obj:`None`, :obj:`ChatGPTConfig().as_dict()` will be used. (default: :obj:`None`) api_key (Optional[str], optional): The API key for authenticating with the OpenAI service. (default: :obj:`None`) url (Optional[str], optional): The url to the OpenAI service. (default: :obj:`None`) token_counter (Optional[BaseTokenCounter], optional): Token counter to use for the model. If not provided, :obj:`OpenAITokenCounter` will be used. (default: :obj:`None`) timeout (Optional[float], optional): The timeout value in seconds for API calls. If not provided, will fall back to the MODEL_TIMEOUT environment variable or default to 180 seconds. (default: :obj:`None`) """ @api_keys_required( [ ("api_key", "OPENAI_API_KEY"), ] ) def __init__( self, model_type: Union[ModelType, str], model_config_dict: Optional[Dict[str, Any]] = None, api_key: Optional[str] = None, url: Optional[str] = None, token_counter: Optional[BaseTokenCounter] = None, timeout: Optional[float] = None, ) -> None: if model_config_dict is None: model_config_dict = ChatGPTConfig().as_dict() api_key = api_key or os.environ.get("OPENAI_API_KEY") url = url or os.environ.get("OPENAI_API_BASE_URL") timeout = timeout or float(os.environ.get("MODEL_TIMEOUT", 180)) super().__init__( model_type, model_config_dict, api_key, url, token_counter, timeout ) self._client = OpenAI( timeout=self._timeout, max_retries=3, base_url=self._url, api_key=self._api_key, ) self._async_client = AsyncOpenAI( timeout=self._timeout, max_retries=3, base_url=self._url, api_key=self._api_key, ) def _sanitize_config(self, config_dict: Dict[str, Any]) -> Dict[str, Any]: r"""Sanitize the model configuration for O1 models.""" if self.model_type in [ ModelType.O1, ModelType.O1_MINI, ModelType.O1_PREVIEW, ModelType.O3_MINI, ]: warnings.warn( "Warning: You are using an reasoning model (O1 or O3), " "which has certain limitations, reference: " "`https://platform.openai.com/docs/guides/reasoning`.", UserWarning, ) return { k: v for k, v in config_dict.items() if k not in UNSUPPORTED_PARAMS } return config_dict def _adapt_messages_for_o1_models( self, messages: List[OpenAIMessage] ) -> List[OpenAIMessage]: r"""Adjust message roles to comply with O1 model requirements by converting 'system' or 'developer' to 'user' role. Args: messages (List[OpenAIMessage]): Message list with the chat history in OpenAI API format. Returns: processed_messages (List[OpenAIMessage]): Return a new list of messages to avoid mutating input. """ # Define supported O1 model types as a class constant would be better O1_MODEL_TYPES = {ModelType.O1_MINI, ModelType.O1_PREVIEW} if self.model_type not in O1_MODEL_TYPES: return messages.copy() # Issue warning only once using class state if not hasattr(self, "_o1_warning_issued"): warnings.warn( "O1 models (O1_MINI/O1_PREVIEW) have role limitations: " "System or Developer messages will be converted to user role." "Reference: https://community.openai.com/t/" "developer-role-not-accepted-for-o1-o1-mini-o3-mini/1110750/7", UserWarning, stacklevel=2, ) self._o1_warning_issued = True # Create new message list to avoid mutating input processed_messages = [] for message in messages: processed_message = message.copy() if ( processed_message["role"] == "system" or processed_message["role"] == "developer" ): processed_message["role"] = "user" # type: ignore[arg-type] processed_messages.append(processed_message) return processed_messages @property def token_counter(self) -> BaseTokenCounter: r"""Initialize the token counter for the model backend. Returns: BaseTokenCounter: The token counter following the model's tokenization style. """ if not self._token_counter: self._token_counter = OpenAITokenCounter(self.model_type) return self._token_counter def _run( self, messages: List[OpenAIMessage], response_format: Optional[Type[BaseModel]] = None, tools: Optional[List[Dict[str, Any]]] = None, ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: r"""Runs inference of OpenAI chat completion. Args: messages (List[OpenAIMessage]): Message list with the chat history in OpenAI API format. response_format (Optional[Type[BaseModel]]): The format of the response. tools (Optional[List[Dict[str, Any]]]): The schema of the tools to use for the request. Returns: Union[ChatCompletion, Stream[ChatCompletionChunk]]: `ChatCompletion` in the non-stream mode, or `Stream[ChatCompletionChunk]` in the stream mode. """ messages = self._adapt_messages_for_o1_models(messages) response_format = response_format or self.model_config_dict.get( "response_format", None ) if response_format: return self._request_parse(messages, response_format, tools) else: return self._request_chat_completion(messages, tools) async def _arun( self, messages: List[OpenAIMessage], response_format: Optional[Type[BaseModel]] = None, tools: Optional[List[Dict[str, Any]]] = None, ) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]: r"""Runs inference of OpenAI chat completion in async mode. Args: messages (List[OpenAIMessage]): Message list with the chat history in OpenAI API format. response_format (Optional[Type[BaseModel]]): The format of the response. tools (Optional[List[Dict[str, Any]]]): The schema of the tools to use for the request. Returns: Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]: `ChatCompletion` in the non-stream mode, or `AsyncStream[ChatCompletionChunk]` in the stream mode. """ response_format = response_format or self.model_config_dict.get( "response_format", None ) if response_format: return await self._arequest_parse(messages, response_format, tools) else: return await self._arequest_chat_completion(messages, tools) def _request_chat_completion( self, messages: List[OpenAIMessage], tools: Optional[List[Dict[str, Any]]] = None, ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: import copy request_config = copy.deepcopy(self.model_config_dict) if tools: request_config["tools"] = tools request_config = self._sanitize_config(request_config) return self._client.chat.completions.create( messages=messages, model=self.model_type, **request_config, ) async def _arequest_chat_completion( self, messages: List[OpenAIMessage], tools: Optional[List[Dict[str, Any]]] = None, ) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]: import copy request_config = copy.deepcopy(self.model_config_dict) if tools: request_config["tools"] = tools request_config = self._sanitize_config(request_config) return await self._async_client.chat.completions.create( messages=messages, model=self.model_type, **request_config, ) def _request_parse( self, messages: List[OpenAIMessage], response_format: Type[BaseModel], tools: Optional[List[Dict[str, Any]]] = None, ) -> ChatCompletion: import copy request_config = copy.deepcopy(self.model_config_dict) request_config["response_format"] = response_format # Remove stream from request config since OpenAI does not support it # with structured response request_config.pop("stream", None) if tools is not None: request_config["tools"] = tools request_config = self._sanitize_config(request_config) return self._client.beta.chat.completions.parse( messages=messages, model=self.model_type, **request_config, ) async def _arequest_parse( self, messages: List[OpenAIMessage], response_format: Type[BaseModel], tools: Optional[List[Dict[str, Any]]] = None, ) -> ChatCompletion: import copy request_config = copy.deepcopy(self.model_config_dict) request_config["response_format"] = response_format # Remove stream from request config since OpenAI does not support it # with structured response request_config.pop("stream", None) if tools is not None: request_config["tools"] = tools request_config = self._sanitize_config(request_config) return await self._async_client.beta.chat.completions.parse( messages=messages, model=self.model_type, **request_config, )
[docs] def check_model_config(self): r"""Check whether the model configuration contains any unexpected arguments to OpenAI API. Raises: ValueError: If the model configuration dictionary contains any unexpected arguments to OpenAI API. """ for param in self.model_config_dict: if param not in OPENAI_API_PARAMS: raise ValueError( f"Unexpected argument `{param}` is " "input into OpenAI model backend." )
@property def stream(self) -> bool: r"""Returns whether the model is in stream mode, which sends partial results each time. Returns: bool: Whether the model is in stream mode. """ return self.model_config_dict.get('stream', False)