Source code for camel.models.model_manager

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import logging
from itertools import cycle
from random import choice
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Union,
)

from openai import Stream

from camel.messages import OpenAIMessage
from camel.models.base_model import BaseModelBackend
from camel.types import (
    ChatCompletion,
    ChatCompletionChunk,
    UnifiedModelType,
)
from camel.utils import BaseTokenCounter

logger = logging.getLogger(__name__)


[docs] class ModelProcessingError(Exception): r"""Raised when an error occurs during model processing.""" pass
[docs] class ModelManager: r"""ModelManager choosing a model from provided list. Models are picked according to defined strategy. Args: models(Union[BaseModelBackend, List[BaseModelBackend]]): model backend or list of model backends (e.g., model instances, APIs) scheduling_strategy (str): name of function that defines how to select the next model. (default: :str:`round_robin`) """ def __init__( self, models: Union[BaseModelBackend, List[BaseModelBackend]], scheduling_strategy: str = "round_robin", ): if isinstance(models, list): self.models = models else: self.models = [models] self.models_cycle = cycle(self.models) self.current_model = self.models[0] # Set the scheduling strategy; default is round-robin try: self.scheduling_strategy = getattr(self, scheduling_strategy) except AttributeError: logger.warning( f"Provided strategy: {scheduling_strategy} is not implemented." f"Using default 'round robin'" ) self.scheduling_strategy = self.round_robin @property def model_type(self) -> UnifiedModelType: r"""Return type of the current model. Returns: Union[ModelType, str]: Current model type. """ return self.current_model.model_type @property def model_config_dict(self) -> Dict[str, Any]: r"""Return model_config_dict of the current model. Returns: Dict[str, Any]: Config dictionary of the current model. """ return self.current_model.model_config_dict @model_config_dict.setter def model_config_dict(self, model_config_dict: Dict[str, Any]): r"""Set model_config_dict to the current model. Args: model_config_dict (Dict[str, Any]): Config dictionary to be set at current model. """ self.current_model.model_config_dict = model_config_dict @property def current_model_index(self) -> int: r"""Return the index of current model in self.models list. Returns: int: index of current model in given list of models. """ return self.models.index(self.current_model) @property def token_limit(self): r"""Returns the maximum token limit for current model. This method retrieves the maximum token limit either from the `model_config_dict` or from the model's default token limit. Returns: int: The maximum token limit for the given model. """ return self.current_model.token_limit @property def token_counter(self) -> BaseTokenCounter: r"""Return token_counter of the current model. Returns: BaseTokenCounter: The token counter following the model's tokenization style. """ return self.current_model.token_counter
[docs] def add_strategy(self, name: str, strategy_fn: Callable): r"""Add a scheduling strategy method provided by user in case when none of existent strategies fits. When custom strategy is provided, it will be set as "self.scheduling_strategy" attribute. Args: name (str): The name of the strategy. strategy_fn (Callable): The scheduling strategy function. """ if not callable(strategy_fn): raise ValueError("strategy_fn must be a callable function.") setattr(self, name, strategy_fn.__get__(self)) self.scheduling_strategy = getattr(self, name) logger.info(f"Custom strategy '{name}' added.")
# Strategies
[docs] def round_robin(self) -> BaseModelBackend: r"""Return models one by one in simple round-robin fashion. Returns: BaseModelBackend for processing incoming messages. """ return next(self.models_cycle)
[docs] def always_first(self) -> BaseModelBackend: r"""Always return the first model from self.models. Returns: BaseModelBackend for processing incoming messages. """ return self.models[0]
[docs] def random_model(self) -> BaseModelBackend: r"""Return random model from self.models list. Returns: BaseModelBackend for processing incoming messages. """ return choice(self.models)
[docs] def run( self, messages: List[OpenAIMessage] ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: r"""Process a list of messages by selecting a model based on the scheduling strategy. Sends the entire list of messages to the selected model, and returns a single response. Args: messages (List[OpenAIMessage]): Message list with the chat history in OpenAI API format. Returns: Union[ChatCompletion, Stream[ChatCompletionChunk]]: `ChatCompletion` in the non-stream mode, or `Stream[ChatCompletionChunk]` in the stream mode. """ self.current_model = self.scheduling_strategy() # Pass all messages to the selected model and get the response try: response = self.current_model.run(messages) except Exception as exc: logger.error(f"Error processing with model: {self.current_model}") if self.scheduling_strategy == self.always_first: self.scheduling_strategy = self.round_robin logger.warning( "The scheduling strategy has been changed to 'round_robin'" ) # Skip already used one self.current_model = self.scheduling_strategy() raise exc return response