Source code for camel.models.watsonx_model

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import os
from typing import Any, Dict, List, Optional, Type, Union

from pydantic import BaseModel

from camel.configs import WATSONX_API_PARAMS, WatsonXConfig
from camel.logger import get_logger
from camel.messages import OpenAIMessage
from camel.models import BaseModelBackend
from camel.models._utils import try_modify_message_with_format
from camel.types import ChatCompletion, ModelType
from camel.utils import (
    BaseTokenCounter,
    OpenAITokenCounter,
    api_keys_required,
)

logger = get_logger(__name__)


[docs] class WatsonXModel(BaseModelBackend): r"""WatsonX API in a unified BaseModelBackend interface. Args: model_type (Union[ModelType, str]): Model type for which a backend is created, one of WatsonX series. model_config_dict (Optional[Dict[str, Any]], optional): A dictionary that will be fed into :obj:`ModelInference.chat()`. If :obj:`None`, :obj:`WatsonXConfig().as_dict()` will be used. (default: :obj:`None`) api_key (Optional[str], optional): The API key for authenticating with the WatsonX service. (default: :obj:`None`) url (Optional[str], optional): The url to the WatsonX service. (default: :obj:`None`) project_id (Optional[str], optional): The project ID authenticating with the WatsonX service. (default: :obj:`None`) token_counter (Optional[BaseTokenCounter], optional): Token counter to use for the model. If not provided, :obj:`OpenAITokenCounter( ModelType.GPT_4O_MINI)` will be used. (default: :obj:`None`) timeout (Optional[float], optional): The timeout value in seconds for API calls. If not provided, will fall back to the MODEL_TIMEOUT environment variable or default to 180 seconds. (default: :obj:`None`) """ @api_keys_required( [ ("api_key", 'WATSONX_API_KEY'), ("project_id", 'WATSONX_PROJECT_ID'), ] ) def __init__( self, model_type: Union[ModelType, str], model_config_dict: Optional[Dict[str, Any]] = None, api_key: Optional[str] = None, url: Optional[str] = None, project_id: Optional[str] = None, token_counter: Optional[BaseTokenCounter] = None, timeout: Optional[float] = None, ): from ibm_watsonx_ai import APIClient, Credentials from ibm_watsonx_ai.foundation_models import ModelInference if model_config_dict is None: model_config_dict = WatsonXConfig().as_dict() api_key = api_key or os.environ.get("WATSONX_API_KEY") url = url or os.environ.get( "WATSONX_URL", "https://jp-tok.ml.cloud.ibm.com" ) project_id = project_id or os.environ.get("WATSONX_PROJECT_ID") timeout = timeout or float(os.environ.get("MODEL_TIMEOUT", 180)) super().__init__( model_type, model_config_dict, api_key, url, token_counter, timeout ) self._project_id = project_id credentials = Credentials(api_key=self._api_key, url=self._url) client = APIClient(credentials, project_id=self._project_id) self._model = ModelInference( model_id=self.model_type, api_client=client, params=model_config_dict, ) def _to_openai_response(self, response: Dict[str, Any]) -> ChatCompletion: r"""Convert WatsonX response to OpenAI format.""" if not response: raise ValueError("Empty response from WatsonX API") # Extract usage information usage = response.get("usage", {}) # Create OpenAI-compatible response obj = ChatCompletion.construct( id=response.get("id", ""), choices=response.get("choices", []), created=response.get("created"), model=self.model_type, object="chat.completion", usage=usage, ) return obj @property def token_counter(self) -> BaseTokenCounter: r"""Initialize the token counter for the model backend. Returns: BaseTokenCounter: The token counter following the model's tokenization style. """ if not self._token_counter: self._token_counter = OpenAITokenCounter( model=ModelType.GPT_4O_MINI ) return self._token_counter def _prepare_request( self, messages: List[OpenAIMessage], response_format: Optional[Type[BaseModel]] = None, tools: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: import copy request_config = copy.deepcopy(self.model_config_dict) if tools: request_config["tools"] = tools elif response_format: try_modify_message_with_format(messages[-1], response_format) request_config["response_format"] = {"type": "json_object"} return request_config def _run( self, messages: List[OpenAIMessage], response_format: Optional[Type[BaseModel]] = None, tools: Optional[List[Dict[str, Any]]] = None, ) -> ChatCompletion: r"""Runs inference of WatsonX chat completion. Args: messages (List[OpenAIMessage]): Message list with the chat history in OpenAI API format. response_format (Optional[Type[BaseModel]], optional): The response format. (default: :obj:`None`) tools (Optional[List[Dict[str, Any]]], optional): tools to use. (default: :obj:`None`) Returns: ChatCompletion. """ try: request_config = self._prepare_request( messages, response_format, tools ) # WatsonX expects messages as a list of dictionaries response = self._model.chat( messages=messages, params=request_config, tools=tools, ) openai_response = self._to_openai_response(response) return openai_response except Exception as e: logger.error(f"Unexpected error when calling WatsonX API: {e!s}") raise async def _arun( self, messages: List[OpenAIMessage], response_format: Optional[Type[BaseModel]] = None, tools: Optional[List[Dict[str, Any]]] = None, ) -> ChatCompletion: r"""Runs inference of WatsonX chat completion asynchronously. Args: messages (List[OpenAIMessage]): Message list with the chat history in OpenAI API format. response_format (Optional[Type[BaseModel]], optional): The response format. (default: :obj:`None`) tools (Optional[List[Dict[str, Any]]], optional): tools to use. (default: :obj:`None`) Returns: ChatCompletion. """ try: request_config = self._prepare_request( messages, response_format, tools ) # WatsonX expects messages as a list of dictionaries response = await self._model.achat( messages=messages, params=request_config, tools=tools, ) openai_response = self._to_openai_response(response) return openai_response except Exception as e: logger.error(f"Unexpected error when calling WatsonX API: {e!s}") raise
[docs] def check_model_config(self): r"""Check whether the model configuration contains any unexpected arguments to WatsonX API. Raises: ValueError: If the model configuration dictionary contains any unexpected arguments to WatsonX API. """ for param in self.model_config_dict: if param not in WATSONX_API_PARAMS: raise ValueError( f"Unexpected argument `{param}` is " "input into WatsonX model backend." )
@property def stream(self) -> bool: r"""Returns whether the model is in stream mode, which sends partial results each time. Returns: bool: Whether the model is in stream mode. """ return False