camel.storages package

Contents

camel.storages package#

Subpackages#

Module contents#

class camel.storages.BaseGraphStorage[source]#

Bases: ABC

An abstract base class for graph storage systems.

abstract add_triplet(subj: str, obj: str, rel: str) None[source]#

Adds a relationship (triplet) between two entities in the database.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

abstract delete_triplet(subj: str, obj: str, rel: str) None[source]#

Deletes a specific triplet from the graph, comprising a subject, object and relationship.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

abstract property get_client: Any#

Get the underlying graph storage client.

abstract property get_schema: str#

Get the schema of the graph storage

abstract property get_structured_schema: Dict[str, Any]#

Get the structured schema of the graph storage

abstract query(query: str, params: Dict[str, Any] | None = None) List[Dict[str, Any]][source]#

Query the graph store with statement and parameters.

Parameters:
  • query (str) – The query to be executed.

  • params (Optional[Dict[str, Any]]) – A dictionary of parameters to be used in the query. Defaults to None.

Returns:

A list of dictionaries, each

dictionary represents a row of results from the query.

Return type:

List[Dict[str, Any]]

abstract refresh_schema() None[source]#

Refreshes the graph schema information.

class camel.storages.BaseKeyValueStorage[source]#

Bases: ABC

An abstract base class for key-value storage systems. Provides a consistent interface for saving, loading, and clearing data records without any loss of information.

An abstract base class designed to serve as a foundation for various key-value storage systems. The class primarily interacts through Python dictionaries.

This class is meant to be inherited by multiple types of key-value storage implementations, including, but not limited to, JSON file storage, NoSQL databases like MongoDB and Redis, as well as in-memory Python dictionaries.

abstract clear() None[source]#

Removes all records from the key-value storage system.

abstract load() List[Dict[str, Any]][source]#

Loads all stored records from the key-value storage system.

Returns:

A list of dictionaries, where each dictionary

represents a stored record.

Return type:

List[Dict[str, Any]]

abstract save(records: List[Dict[str, Any]]) None[source]#

Saves a batch of records to the key-value storage system.

Parameters:

records (List[Dict[str, Any]]) – A list of dictionaries, where each dictionary represents a unique record to be stored.

class camel.storages.BaseVectorStorage[source]#

Bases: ABC

An abstract base class for vector storage systems.

abstract add(records: List[VectorRecord], **kwargs: Any) None[source]#

Saves a list of vector records to the storage.

Parameters:
  • records (List[VectorRecord]) – List of vector records to be saved.

  • **kwargs (Any) – Additional keyword arguments.

Raises:

RuntimeError – If there is an error during the saving process.

abstract clear() None[source]#

Remove all vectors from the storage.

abstract property client: Any#

Provides access to the underlying vector database client.

abstract delete(ids: List[str], **kwargs: Any) None[source]#

Deletes a list of vectors identified by their IDs from the storage.

Parameters:
  • ids (List[str]) – List of unique identifiers for the vectors to be deleted.

  • **kwargs (Any) – Additional keyword arguments.

Raises:

RuntimeError – If there is an error during the deletion process.

get_payloads_by_vector(vector: List[float], top_k: int) List[Dict[str, Any]][source]#

Returns payloads of top k vector records that closest to the given vector.

This function is a wrapper of BaseVectorStorage.query.

Parameters:
  • vector (List[float]) – The search vector.

  • top_k (int) – The number of top similer vectors.

Returns:

A list of vector payloads retrieved

from the storage based on similarity to the query vector.

Return type:

List[List[Dict[str, Any]]]

abstract load() None[source]#

Load the collection hosted on cloud service.

abstract query(query: VectorDBQuery, **kwargs: Any) List[VectorDBQueryResult][source]#

Searches for similar vectors in the storage based on the provided query.

Parameters:
  • query (VectorDBQuery) – The query object containing the search vector and the number of top similar vectors to retrieve.

  • **kwargs (Any) – Additional keyword arguments.

Returns:

A list of vectors retrieved from the

storage based on similarity to the query vector.

Return type:

List[VectorDBQueryResult]

abstract status() VectorDBStatus[source]#

Returns status of the vector database.

Returns:

The vector database status.

Return type:

VectorDBStatus

class camel.storages.InMemoryKeyValueStorage[source]#

Bases: BaseKeyValueStorage

A concrete implementation of the BaseKeyValueStorage using in-memory list. Ideal for temporary storage purposes, as data will be lost when the program ends.

clear() None[source]#

Removes all records from the key-value storage system.

load() List[Dict[str, Any]][source]#

Loads all stored records from the key-value storage system.

Returns:

A list of dictionaries, where each dictionary

represents a stored record.

Return type:

List[Dict[str, Any]]

save(records: List[Dict[str, Any]]) None[source]#

Saves a batch of records to the key-value storage system.

Parameters:

records (List[Dict[str, Any]]) – A list of dictionaries, where each dictionary represents a unique record to be stored.

class camel.storages.JsonStorage(path: Path | None = None)[source]#

Bases: BaseKeyValueStorage

A concrete implementation of the BaseKeyValueStorage using JSON files. Allows for persistent storage of records in a human-readable format.

Parameters:

path (Path, optional) – Path to the desired JSON file. If None, a default path ./chat_history.json will be used. (default: None)

clear() None[source]#

Removes all records from the key-value storage system.

load() List[Dict[str, Any]][source]#

Loads all stored records from the key-value storage system.

Returns:

A list of dictionaries, where each dictionary

represents a stored record.

Return type:

List[Dict[str, Any]]

save(records: List[Dict[str, Any]]) None[source]#

Saves a batch of records to the key-value storage system.

Parameters:

records (List[Dict[str, Any]]) – A list of dictionaries, where each dictionary represents a unique record to be stored.

class camel.storages.MilvusStorage(vector_dim: int, url_and_api_key: Tuple[str, str], collection_name: str | None = None, **kwargs: Any)[source]#

Bases: BaseVectorStorage

An implementation of the BaseVectorStorage for interacting with Milvus, a cloud-native vector search engine.

The detailed information about Milvus is available at: Milvus

Parameters:
  • vector_dim (int) – The dimenstion of storing vectors.

  • url_and_api_key (Tuple[str, str]) – Tuple containing the URL and API key for connecting to a remote Milvus instance. URL maps to Milvus uri concept, typically “endpoint:port”. API key maps to Milvus token concept, for self-hosted it’s “username:pwd”, for Zilliz Cloud (fully-managed Milvus) it’s API Key.

  • collection_name (Optional[str], optional) – Name for the collection in the Milvus. If not provided, set it to the current time with iso format. (default: None)

  • **kwargs (Any) – Additional keyword arguments for initializing MilvusClient.

Raises:

ImportError – If pymilvus package is not installed.

add(records: List[VectorRecord], **kwargs) None[source]#

Adds a list of vectors to the specified collection.

Parameters:
  • records (List[VectorRecord]) – List of vectors to be added.

  • **kwargs (Any) – Additional keyword arguments pass to insert.

Raises:

RuntimeError – If there was an error in the addition process.

clear() None[source]#

Removes all vectors from the Milvus collection. This method deletes the existing collection and then recreates it with the same schema to effectively remove all stored vectors.

property client: Any#

Provides direct access to the Milvus client. This property allows for direct interactions with the Milvus client for operations that are not covered by the MilvusStorage class.

Returns:

The Milvus client instance.

Return type:

Any

delete(ids: List[str], **kwargs: Any) None[source]#

Deletes a list of vectors identified by their IDs from the storage. If unsure of ids you can first query the collection to grab the corresponding data.

Parameters:
  • ids (List[str]) – List of unique identifiers for the vectors to be deleted.

  • **kwargs (Any) – Additional keyword arguments passed to delete.

Raises:

RuntimeError – If there is an error during the deletion process.

load() None[source]#

Load the collection hosted on cloud service.

query(query: VectorDBQuery, **kwargs: Any) List[VectorDBQueryResult][source]#

Searches for similar vectors in the storage based on the provided query.

Parameters:
  • query (VectorDBQuery) – The query object containing the search vector and the number of top similar vectors to retrieve.

  • **kwargs (Any) – Additional keyword arguments passed to search.

Returns:

A list of vectors retrieved from the

storage based on similarity to the query vector.

Return type:

List[VectorDBQueryResult]

status() VectorDBStatus[source]#

Retrieves the current status of the Milvus collection. This method provides information about the collection, including its vector dimensionality and the total number of vectors stored.

Returns:

An object containing information about the

collection’s status.

Return type:

VectorDBStatus

class camel.storages.NebulaGraph(host, username, password, space, port=9669, timeout=10000)[source]#

Bases: BaseGraphStorage

add_graph_elements(graph_elements: List[GraphElement]) None[source]#

Add graph elements (nodes and relationships) to the graph.

Parameters:

graph_elements (List[GraphElement]) – A list of graph elements containing nodes and relationships.

add_node(node_id: str, tag_name: str) None[source]#

Add a node with the specified tag and properties.

Parameters:
  • node_id (str) – The ID of the node.

  • tag_name (str) – The tag name of the node.

add_triplet(subj: str, obj: str, rel: str) None[source]#

Adds a relationship (triplet) between two entities in the Nebula Graph database.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

delete_entity(entity_id: str) None[source]#

Deletes an entity (vertex) from the graph.

Parameters:

entity_id (str) – The identifier of the entity to be deleted.

delete_triplet(subj: str, obj: str, rel: str) None[source]#

Deletes a specific triplet (relationship between two entities) from the Nebula Graph database.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

ensure_edge_type_exists(edge_type: str) None[source]#

Ensures that a specified edge type exists in the NebulaGraph database. If the edge type already exists, this method does nothing.

Parameters:

edge_type (str) – The name of the edge type to be created.

Raises:

Exception – If the edge type creation fails after multiple retry attempts, an exception is raised with the error message.

ensure_tag_exists(tag_name: str) None[source]#

Ensures a tag is created in the NebulaGraph database. If the tag already exists, it does nothing.

Parameters:

tag_name (str) – The name of the tag to be created.

Raises:

Exception – If the tag creation fails after retries, an exception is raised with the error message.

property get_client: Any#

Get the underlying graph storage client.

get_indexes()[source]#

Fetches the tag indexes from the database.

Returns:

A list of tag index names.

Return type:

List[str]

get_node_properties() Tuple[List[str], List[Dict[str, Any]]][source]#

Retrieve node properties from the graph.

Returns:

A tuple where the first

element is a list of node schema properties, and the second element is a list of dictionaries representing node structures.

Return type:

Tuple[List[str], List[Dict[str, Any]]]

get_relationship_properties() Tuple[List[str], List[Dict[str, Any]]][source]#

Retrieve relationship (edge) properties from the graph.

Returns:

A tuple where the first

element is a list of relationship schema properties, and the second element is a list of dictionaries representing relationship structures.

Return type:

Tuple[List[str], List[Dict[str, Any]]]

get_relationship_types() List[str][source]#

Retrieve relationship types from the graph.

Returns:

A list of relationship (edge) type names.

Return type:

List[str]

get_schema()[source]#

Generates a schema string describing node and relationship properties and relationships.

Returns:

A string describing the schema.

Return type:

str

property get_structured_schema: Dict[str, Any]#

Generates a structured schema consisting of node and relationship properties, relationships, and metadata.

Returns:

A dictionary representing the structured schema.

Return type:

Dict[str, Any]

query(query: str) ResultSet[source]#

Execute a query on the graph store.

Parameters:

query (str) – The Cypher-like query to be executed.

Returns:

The result set of the query execution.

Return type:

ResultSet

Raises:

ValueError – If the query execution fails.

refresh_schema() None[source]#

Refreshes the schema by fetching the latest schema details.

class camel.storages.Neo4jGraph(url: str, username: str, password: str, database: str = 'neo4j', timeout: float | None = None, truncate: bool = False)[source]#

Bases: BaseGraphStorage

Provides a connection to a Neo4j database for various graph operations.

The detailed information about Neo4j is available at: Neo4j https://neo4j.com/docs/getting-started

This module refered to the work of Langchian and Llamaindex.

Parameters:
  • url (str) – The URL of the Neo4j database server.

  • username (str) – The username for database authentication.

  • password (str) – The password for database authentication.

  • database (str) – The name of the database to connect to. Defaults to neo4j.

  • timeout (Optional[float]) – The timeout for transactions in seconds. Useful for terminating long-running queries. Defaults to None.

  • truncate (bool) – A flag to indicate whether to remove lists with more than LIST_LIMIT elements from results. Defaults to False.

add_graph_elements(graph_elements: List[GraphElement], include_source: bool = False, base_entity_label: bool = False) None[source]#

Adds nodes and relationships from a list of GraphElement objects to the graph storage.

Parameters:
  • graph_elements (List[GraphElement]) – A list of GraphElement objects that contain the nodes and relationships to be added to the graph. Each GraphElement should encapsulate the structure of part of the graph, including nodes, relationships, and the source element information.

  • include_source (bool, optional) – If True, stores the source element and links it to nodes in the graph using the MENTIONS relationship. This is useful for tracing back the origin of data. Merges source elements based on the id property from the source element metadata if available; otherwise it calculates the MD5 hash of page_content for merging process. Defaults to False.

  • base_entity_label (bool, optional) – If True, each newly created node gets a secondary BASE_ENTITY_LABEL label, which is indexed and improves import speed and performance. Defaults to False.

add_triplet(subj: str, obj: str, rel: str) None[source]#

Adds a relationship (triplet) between two entities in the database.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

delete_triplet(subj: str, obj: str, rel: str) None[source]#

Deletes a specific triplet from the graph, comprising a subject, object and relationship.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

property get_client: Any#

Get the underlying graph storage client.

property get_schema: str#

Retrieve the schema of the Neo4jGraph store.

Parameters:

refresh (bool) – A flag indicating whether to forcibly refresh the schema from the Neo4jGraph store regardless of whether it is already cached. Defaults to False.

Returns:

The schema of the Neo4jGraph store.

Return type:

str

property get_structured_schema: Dict[str, Any]#

Returns the structured schema of the graph

Returns:

The structured schema of the graph.

Return type:

Dict[str, Any]

query(query: str, params: Dict[str, Any] | None = None) List[Dict[str, Any]][source]#

Executes a Neo4j Cypher declarative query in a database.

Parameters:
  • query (str) – The Cypher query to be executed.

  • params (Optional[Dict[str, Any]]) – A dictionary of parameters to be used in the query. Defaults to None.

Returns:

A list of dictionaries, each

dictionary represents a row of results from the Cypher query.

Return type:

List[Dict[str, Any]]

Raises:

ValueError – If the executed Cypher query syntax is invalid.

refresh_schema() None[source]#

Refreshes the Neo4j graph schema information by querying the database for node properties, relationship properties, and relationships.

class camel.storages.QdrantStorage(vector_dim: int, collection_name: str | None = None, url_and_api_key: Tuple[str, str] | None = None, path: str | None = None, distance: VectorDistance = VectorDistance.COSINE, delete_collection_on_del: bool = False, **kwargs: Any)[source]#

Bases: BaseVectorStorage

An implementation of the BaseVectorStorage for interacting with Qdrant, a vector search engine.

The detailed information about Qdrant is available at: Qdrant

Parameters:
  • vector_dim (int) – The dimenstion of storing vectors.

  • collection_name (Optional[str], optional) – Name for the collection in the Qdrant. If not provided, set it to the current time with iso format. (default: None)

  • url_and_api_key (Optional[Tuple[str, str]], optional) – Tuple containing the URL and API key for connecting to a remote Qdrant instance. (default: None)

  • path (Optional[str], optional) – Path to a directory for initializing a local Qdrant client. (default: None)

  • distance (VectorDistance, optional) – The distance metric for vector comparison (default: VectorDistance.COSINE)

  • delete_collection_on_del (bool, optional) – Flag to determine if the collection should be deleted upon object destruction. (default: False)

  • **kwargs (Any) – Additional keyword arguments for initializing QdrantClient.

Notes

  • If url_and_api_key is provided, it takes priority and the client will attempt to connect to the remote Qdrant instance using the URL endpoint.

  • If url_and_api_key is not provided and path is given, the client will use the local path to initialize Qdrant.

  • If neither url_and_api_key nor path is provided, the client will be initialized with an in-memory storage (“:memory:”).

add(records: List[VectorRecord], **kwargs) None[source]#

Adds a list of vectors to the specified collection.

Parameters:
  • vectors (List[VectorRecord]) – List of vectors to be added.

  • **kwargs (Any) – Additional keyword arguments.

Raises:

RuntimeError – If there was an error in the addition process.

clear() None[source]#

Remove all vectors from the storage.

property client: Any#

Provides access to the underlying vector database client.

delete(ids: List[str], **kwargs: Any) None[source]#

Deletes a list of vectors identified by their IDs from the storage.

Parameters:
  • ids (List[str]) – List of unique identifiers for the vectors to be deleted.

  • **kwargs (Any) – Additional keyword arguments.

Raises:

RuntimeError – If there is an error during the deletion process.

load() None[source]#

Load the collection hosted on cloud service.

query(query: VectorDBQuery, **kwargs: Any) List[VectorDBQueryResult][source]#

Searches for similar vectors in the storage based on the provided query.

Parameters:
  • query (VectorDBQuery) – The query object containing the search vector and the number of top similar vectors to retrieve.

  • **kwargs (Any) – Additional keyword arguments.

Returns:

A list of vectors retrieved from the

storage based on similarity to the query vector.

Return type:

List[VectorDBQueryResult]

status() VectorDBStatus[source]#

Returns status of the vector database.

Returns:

The vector database status.

Return type:

VectorDBStatus

class camel.storages.RedisStorage(sid: str, url: str = 'redis://localhost:6379', loop: AbstractEventLoop | None = None, **kwargs)[source]#

Bases: BaseKeyValueStorage

A concrete implementation of the BaseCacheStorage using Redis as the backend. This is suitable for distributed cache systems that require persistence and high availability.

clear() None[source]#

Removes all records from the key-value storage system.

property client: Redis | None#

Returns the Redis client instance.

Returns:

The Redis client instance.

Return type:

redis.asyncio.Redis

async close() None[source]#

Closes the Redis client asynchronously.

load() List[Dict[str, Any]][source]#

Loads all stored records from the key-value storage system.

Returns:

A list of dictionaries, where each dictionary

represents a stored record.

Return type:

List[Dict[str, Any]]

save(records: List[Dict[str, Any]], expire: int | None = None) None[source]#

Saves a batch of records to the key-value storage system.

class camel.storages.VectorDBQuery(query_vector: List[float], top_k: int)[source]#

Bases: BaseModel

Represents a query to a vector database.

query_vector#

The numerical representation of the query vector.

Type:

List[float]

top_k#

The number of top similar vectors to retrieve from the database. (default: 1)

Type:

int, optional

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'query_vector': FieldInfo(annotation=List[float], required=True), 'top_k': FieldInfo(annotation=int, required=False, default=1)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

query_vector: List[float]#

The numerical representation of the query vector.

top_k: int#

The number of top similar vectors to retrieve from the database.

class camel.storages.VectorDBQueryResult(*, record: VectorRecord, similarity: float)[source]#

Bases: BaseModel

Encapsulates the result of a query against a vector database.

record#

The target vector record.

Type:

VectorRecord

similarity#

The similarity score between the query vector and the record.

Type:

float

classmethod create(similarity: float, vector: List[float], id: str, payload: Dict[str, Any] | None = None) VectorDBQueryResult[source]#

A class method to construct a VectorDBQueryResult instance.

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'record': FieldInfo(annotation=VectorRecord, required=True), 'similarity': FieldInfo(annotation=float, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

record: VectorRecord#
similarity: float#
class camel.storages.VectorRecord(*, vector: List[float], id: str = None, payload: Dict[str, Any] | None = None)[source]#

Bases: BaseModel

Encapsulates information about a vector’s unique identifier and its payload, which is primarily used as a data transfer object when saving to vector storage.

vector#

The numerical representation of the vector.

Type:

List[float]

id#

A unique identifier for the vector. If not provided, an random uuid will be assigned.

Type:

str, optional

payload#

Any additional metadata or information related to the vector. (default: None)

Type:

Optional[Dict[str, Any]], optional

id: str#
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'id': FieldInfo(annotation=str, required=False, default_factory=<lambda>), 'payload': FieldInfo(annotation=Union[Dict[str, Any], NoneType], required=False, default=None), 'vector': FieldInfo(annotation=List[float], required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

payload: Dict[str, Any] | None#
vector: List[float]#