Source code for camel.models.qwen_model

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import os
import time
from typing import Any, Dict, List, Optional, Union

from openai import AsyncStream, Stream

from camel.configs import QWEN_API_PARAMS, QwenConfig
from camel.messages import OpenAIMessage
from camel.models.openai_compatible_model import OpenAICompatibleModel
from camel.types import (
    ChatCompletion,
    ChatCompletionChunk,
    ModelType,
)
from camel.utils import (
    BaseTokenCounter,
    api_keys_required,
)


[docs] class QwenModel(OpenAICompatibleModel): r"""Qwen API in a unified OpenAICompatibleModel interface. Args: model_type (Union[ModelType, str]): Model for which a backend is created, one of Qwen series. model_config_dict (Optional[Dict[str, Any]], optional): A dictionary that will be fed into:obj:`openai.ChatCompletion.create()`. If :obj:`None`, :obj:`QwenConfig().as_dict()` will be used. (default: :obj:`None`) api_key (Optional[str], optional): The API key for authenticating with the Qwen service. (default: :obj:`None`) url (Optional[str], optional): The url to the Qwen service. (default: :obj:`https://dashscope.aliyuncs.com/compatible-mode/v1`) token_counter (Optional[BaseTokenCounter], optional): Token counter to use for the model. If not provided, :obj:`OpenAITokenCounter( ModelType.GPT_4O_MINI)` will be used. (default: :obj:`None`) timeout (Optional[float], optional): The timeout value in seconds for API calls. If not provided, will fall back to the MODEL_TIMEOUT environment variable or default to 180 seconds. (default: :obj:`None`) """ @api_keys_required( [ ("api_key", "QWEN_API_KEY"), ] ) def __init__( self, model_type: Union[ModelType, str], model_config_dict: Optional[Dict[str, Any]] = None, api_key: Optional[str] = None, url: Optional[str] = None, token_counter: Optional[BaseTokenCounter] = None, timeout: Optional[float] = None, ) -> None: if model_config_dict is None: model_config_dict = QwenConfig().as_dict() api_key = api_key or os.environ.get("QWEN_API_KEY") url = url or os.environ.get( "QWEN_API_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1", ) timeout = timeout or float(os.environ.get("MODEL_TIMEOUT", 180)) super().__init__( model_type=model_type, model_config_dict=model_config_dict, api_key=api_key, url=url, token_counter=token_counter, timeout=timeout, ) def _post_handle_response( self, response: Union[ChatCompletion, Stream[ChatCompletionChunk]] ) -> ChatCompletion: r"""Handle reasoning content with <think> tags at the beginning.""" if not isinstance(response, Stream): # Handle non-streaming response (existing logic) if self.model_config_dict.get("extra_body", {}).get( "enable_thinking", False ): reasoning_content = response.choices[ 0 ].message.reasoning_content # type: ignore[attr-defined] combined_content = ( f"<think>\n{reasoning_content}\n</think>\n" if reasoning_content else "" ) response_content = response.choices[0].message.content or "" combined_content += response_content # Construct a new ChatCompletion with combined content return ChatCompletion.construct( id=response.id, choices=[ dict( finish_reason=response.choices[0].finish_reason, index=response.choices[0].index, logprobs=response.choices[0].logprobs, message=dict( role=response.choices[0].message.role, content=combined_content, ), ) ], created=response.created, model=response.model, object="chat.completion", system_fingerprint=response.system_fingerprint, usage=response.usage, ) else: return response # Return original if no thinking enabled # Handle streaming response accumulated_reasoning = "" accumulated_content = "" final_chunk = None usage_data = None # Initialize usage data role = "assistant" # Default role for chunk in response: final_chunk = chunk # Keep track of the last chunk for metadata if chunk.choices: delta = chunk.choices[0].delta if delta.role: role = delta.role # Update role if provided if ( hasattr(delta, 'reasoning_content') and delta.reasoning_content ): accumulated_reasoning += delta.reasoning_content if delta.content: accumulated_content += delta.content if hasattr(chunk, 'usage') and chunk.usage: usage_data = chunk.usage combined_content = ( f"<think>\n{accumulated_reasoning}\n</think>\n" if accumulated_reasoning else "" ) + accumulated_content # Construct the final ChatCompletion object from accumulated # stream data if final_chunk: finish_reason = "stop" # Default finish reason logprobs = None if final_chunk.choices: finish_reason = ( final_chunk.choices[0].finish_reason or finish_reason ) if hasattr(final_chunk.choices[0], 'logprobs'): logprobs = final_chunk.choices[0].logprobs return ChatCompletion.construct( # Use data from the final chunk or defaults id=final_chunk.id if hasattr(final_chunk, 'id') else "streamed-completion", choices=[ dict( finish_reason=finish_reason, index=0, logprobs=logprobs, message=dict( role=role, content=combined_content, ), ) ], created=final_chunk.created if hasattr(final_chunk, 'created') else int(time.time()), model=final_chunk.model if hasattr(final_chunk, 'model') else self.model_type, object="chat.completion", system_fingerprint=final_chunk.system_fingerprint if hasattr(final_chunk, 'system_fingerprint') else None, usage=usage_data, ) else: # Handle cases where the stream was empty or invalid return ChatCompletion.construct( id="empty-stream", choices=[ dict( finish_reason="error", index=0, message=dict(role="assistant", content=""), ) ], created=int(time.time()), model=self.model_type, object="chat.completion", usage=usage_data, ) def _request_chat_completion( self, messages: List[OpenAIMessage], tools: Optional[List[Dict[str, Any]]] = None, ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: request_config = self.model_config_dict.copy() if tools: request_config["tools"] = tools return self._post_handle_response( self._client.chat.completions.create( messages=messages, model=self.model_type, **request_config, ) ) async def _arequest_chat_completion( self, messages: List[OpenAIMessage], tools: Optional[List[Dict[str, Any]]] = None, ) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]: request_config = self.model_config_dict.copy() if tools: request_config["tools"] = tools response = await self._async_client.chat.completions.create( messages=messages, model=self.model_type, **request_config, ) return self._post_handle_response(response)
[docs] def check_model_config(self): r"""Check whether the model configuration contains any unexpected arguments to Qwen API. Raises: ValueError: If the model configuration dictionary contains any unexpected arguments to Qwen API. """ for param in self.model_config_dict: if param not in QWEN_API_PARAMS: raise ValueError( f"Unexpected argument `{param}` is " "input into Qwen model backend." )