Source code for camel.storages.key_value_storages.mem0_cloud

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import logging
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID

from camel.memories.records import MemoryRecord
from camel.messages import BaseMessage
from camel.storages.key_value_storages import BaseKeyValueStorage
from camel.types import OpenAIBackendRole, RoleType

logger = logging.getLogger(__name__)


[docs] class Mem0Storage(BaseKeyValueStorage): r"""A concrete implementation of the :obj:`BaseKeyValueStorage` using Mem0 as the backend. This storage system uses Mem0's text capabilities to store, search, and manage text with context. Args: agent_id (str): Default agent ID to associate memories with. api_key (str, optional): The API key for authentication. If not provided, will try to get from environment variable MEM0_API_KEY (default: :obj:`None`). user_id (str, optional): Default user ID to associate memories with (default: :obj:`None`). metadata (Dict[str, Any], optional): Default metadata to include with all memories (default: :obj:`None`). References: https://docs.mem0.ai """ def __init__( self, agent_id: str, api_key: Optional[str] = None, user_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> None: try: from mem0 import MemoryClient except ImportError as exc: logger.error( "Please install `mem0` first. You can install it by " "running `pip install mem0ai`." ) raise exc self.api_key = api_key or os.getenv("MEM0_API_KEY") if not self.api_key: raise ValueError( "API key must be provided either through constructor " "or MEM0_API_KEY environment variable." ) self.client = MemoryClient(api_key=self.api_key) self.agent_id = agent_id self.user_id = user_id self.metadata = metadata or {} def _prepare_options( self, agent_id: Optional[str] = None, user_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Dict[str, Any]: r"""Helper method to prepare options for Mem0 API calls. Args: agent_id (Optional[str], optional): Agent ID to use (default: :obj:`None`). user_id (Optional[str], optional): User ID to use (default: :obj:`None`). metadata (Optional[Dict[str, Any]], optional): Additional metadata to include (default: :obj:`None`). **kwargs (Any): Additional keyword arguments. Returns: Dict[str, Any]: Prepared options dictionary for API calls. """ options = { "agent_id": agent_id or self.agent_id, "user_id": user_id or self.user_id, "metadata": {**self.metadata, **(metadata or {})}, "output_format": "v1.1", **kwargs, } return {k: v for k, v in options.items() if v is not None} def _prepare_filters( self, agent_id: Optional[str] = None, user_id: Optional[str] = None, filters: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: r"""Helper method to prepare filters for Mem0 API calls. Args: agent_id (Optional[str], optional): Agent ID to filter by (default: :obj:`None`). user_id (Optional[str], optional): User ID to filter by (default: :obj:`None`). filters (Optional[Dict[str, Any]], optional): Additional filters (default: :obj:`None`). Returns: Dict[str, Any]: Prepared filters dictionary for API calls. """ base_filters: Dict[str, Any] = {"AND": []} if filters: base_filters["AND"].append(filters) if agent_id or self.agent_id: base_filters["AND"].append({"agent_id": agent_id or self.agent_id}) if user_id or self.user_id: base_filters["AND"].append({"user_id": user_id or self.user_id}) return base_filters if base_filters["AND"] else {} def _prepare_messages( self, records: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: r"""Prepare messages from records for Mem0 API calls. Args: records (List[Dict[str, Any]]): List of record dictionaries. Returns: List[Dict[str, Any]]: List of prepared message dictionaries. """ messages = [] for record in records: content = record["message"]["content"] role = record["role_at_backend"].value messages.append({"role": role, "content": content}) return messages
[docs] def save(self, records: List[Dict[str, Any]]) -> None: r"""Saves a batch of records to the Mem0 storage system. Args: records (List[Dict[str, Any]]): A list of dictionaries, where each dictionary represents a unique record to be stored. """ try: messages = self._prepare_messages(records) options = self._prepare_options( agent_id=self.agent_id, user_id=self.user_id, metadata=self.metadata, ) self.client.add(messages, **options) except Exception as e: logger.error(f"Error adding memory: {e}") logger.error(f"Error: {e}")
[docs] def load(self) -> List[Dict[str, Any]]: r"""Loads all stored records from the Mem0 storage system. Returns: List[Dict[str, Any]]: A list of dictionaries, where each dictionary represents a stored record. """ try: filters = self._prepare_filters( agent_id=self.agent_id, user_id=self.user_id, ) results = self.client.get_all(version="v2", **filters) # Transform results into MemoryRecord objects transformed_results = [] for result in results: memory_record = MemoryRecord( uuid=UUID(result["id"]), message=BaseMessage( role_name="user", role_type=RoleType.USER, meta_dict={}, content=result["memory"], ), role_at_backend=OpenAIBackendRole.USER, extra_info=result.get("metadata", {}), timestamp=datetime.fromisoformat( result["created_at"] ).timestamp(), agent_id=result.get("agent_id", ""), ) transformed_results.append(memory_record.to_dict()) return transformed_results except Exception as e: logger.error(f"Error searching memories: {e}") return []
[docs] def clear( self, ) -> None: r"""Removes all records from the Mem0 storage system.""" try: filters = self._prepare_filters( agent_id=self.agent_id, user_id=self.user_id, ) self.client.delete_users(**filters) except Exception as e: logger.error(f"Error deleting memories: {e}") logger.error(f"Error: {e}")