Mem0Storage

class Mem0Storage(BaseKeyValueStorage):

A concrete implementation of the :obj:BaseKeyValueStorage using Mem0 as the backend. This storage system uses Mem0’s text capabilities to store, search, and manage text with context.

Parameters:

  • agent_id (str): Default agent ID to associate memories with.
  • api_key (str, optional): The API key for authentication. If not provided, will try to get from environment variable MEM0_API_KEY (default: :obj:None).
  • user_id (str, optional): Default user ID to associate memories with (default: :obj:None).
  • metadata (Dict[str, Any], optional): Default metadata to include with all memories (default: :obj:None).
  • References:
  • https: //docs.mem0.ai

init

def __init__(
    self,
    agent_id: str,
    api_key: Optional[str] = None,
    user_id: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None
):

_prepare_options

def _prepare_options(
    self,
    agent_id: Optional[str] = None,
    user_id: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None,
    **kwargs: Any
):

Helper method to prepare options for Mem0 API calls.

Parameters:

  • agent_id (Optional[str], optional): Agent ID to use (default: :obj:None).
  • user_id (Optional[str], optional): User ID to use (default: :obj:None).
  • metadata (Optional[Dict[str, Any]], optional): Additional metadata to include (default: :obj:None). **kwargs (Any): Additional keyword arguments.

Returns:

Dict[str, Any]: Prepared options dictionary for API calls.

_prepare_filters

def _prepare_filters(
    self,
    agent_id: Optional[str] = None,
    user_id: Optional[str] = None,
    filters: Optional[Dict[str, Any]] = None
):

Helper method to prepare filters for Mem0 API calls.

Parameters:

  • agent_id (Optional[str], optional): Agent ID to filter by (default: :obj:None).
  • user_id (Optional[str], optional): User ID to filter by (default: :obj:None).
  • filters (Optional[Dict[str, Any]], optional): Additional filters (default: :obj:None).

Returns:

Dict[str, Any]: Prepared filters dictionary for API calls.

_prepare_messages

def _prepare_messages(self, records: List[Dict[str, Any]]):

Prepare messages from records for Mem0 API calls.

Parameters:

  • records (List[Dict[str, Any]]): List of record dictionaries.

Returns:

List[Dict[str, Any]]: List of prepared message dictionaries.

save

def save(self, records: List[Dict[str, Any]]):

Saves a batch of records to the Mem0 storage system.

Parameters:

  • records (List[Dict[str, Any]]): A list of dictionaries, where each dictionary represents a unique record to be stored.

load

def load(self):

Returns:

List[Dict[str, Any]]: A list of dictionaries, where each dictionary represents a stored record.

clear

def clear(self):

Removes all records from the Mem0 storage system.