camel.storages package#
Subpackages#
- camel.storages.graph_storages package
- Submodules
- camel.storages.graph_storages.base module
- camel.storages.graph_storages.graph_element module
- camel.storages.graph_storages.neo4j_graph module
- Module contents
BaseGraphStorage
GraphElement
NebulaGraph
NebulaGraph.add_graph_elements()
NebulaGraph.add_node()
NebulaGraph.add_triplet()
NebulaGraph.delete_entity()
NebulaGraph.delete_triplet()
NebulaGraph.ensure_edge_type_exists()
NebulaGraph.ensure_tag_exists()
NebulaGraph.get_client
NebulaGraph.get_indexes()
NebulaGraph.get_node_properties()
NebulaGraph.get_relationship_properties()
NebulaGraph.get_relationship_types()
NebulaGraph.get_schema()
NebulaGraph.get_structured_schema
NebulaGraph.query()
NebulaGraph.refresh_schema()
Neo4jGraph
- camel.storages.key_value_storages package
- camel.storages.object_storages package
- camel.storages.vectordb_storages package
- Submodules
- camel.storages.vectordb_storages.base module
- camel.storages.vectordb_storages.milvus module
- camel.storages.vectordb_storages.qdrant module
- Module contents
Module contents#
- class camel.storages.BaseGraphStorage[source]#
Bases:
ABC
An abstract base class for graph storage systems.
- abstract add_triplet(subj: str, obj: str, rel: str) None [source]#
Adds a relationship (triplet) between two entities in the database.
- Parameters:
subj (str) – The identifier for the subject entity.
obj (str) – The identifier for the object entity.
rel (str) – The relationship between the subject and object.
- abstract delete_triplet(subj: str, obj: str, rel: str) None [source]#
Deletes a specific triplet from the graph, comprising a subject, object and relationship.
- Parameters:
subj (str) – The identifier for the subject entity.
obj (str) – The identifier for the object entity.
rel (str) – The relationship between the subject and object.
- abstract property get_client: Any#
Get the underlying graph storage client.
- abstract property get_schema: str#
Get the schema of the graph storage
- abstract property get_structured_schema: Dict[str, Any]#
Get the structured schema of the graph storage
- abstract query(query: str, params: Dict[str, Any] | None = None) List[Dict[str, Any]] [source]#
Query the graph store with statement and parameters.
- Parameters:
query (str) – The query to be executed.
params (Optional[Dict[str, Any]]) – A dictionary of parameters to be used in the query. Defaults to None.
- Returns:
- A list of dictionaries, each
dictionary represents a row of results from the query.
- Return type:
List[Dict[str, Any]]
- class camel.storages.BaseKeyValueStorage[source]#
Bases:
ABC
An abstract base class for key-value storage systems. Provides a consistent interface for saving, loading, and clearing data records without any loss of information.
An abstract base class designed to serve as a foundation for various key-value storage systems. The class primarily interacts through Python dictionaries.
This class is meant to be inherited by multiple types of key-value storage implementations, including, but not limited to, JSON file storage, NoSQL databases like MongoDB and Redis, as well as in-memory Python dictionaries.
- class camel.storages.BaseVectorStorage[source]#
Bases:
ABC
An abstract base class for vector storage systems.
- abstract add(records: List[VectorRecord], **kwargs: Any) None [source]#
Saves a list of vector records to the storage.
- Parameters:
records (List[VectorRecord]) – List of vector records to be saved.
**kwargs (Any) – Additional keyword arguments.
- Raises:
RuntimeError – If there is an error during the saving process.
- abstract property client: Any#
Provides access to the underlying vector database client.
- abstract delete(ids: List[str], **kwargs: Any) None [source]#
Deletes a list of vectors identified by their IDs from the storage.
- Parameters:
ids (List[str]) – List of unique identifiers for the vectors to be deleted.
**kwargs (Any) – Additional keyword arguments.
- Raises:
RuntimeError – If there is an error during the deletion process.
- get_payloads_by_vector(vector: List[float], top_k: int) List[Dict[str, Any]] [source]#
Returns payloads of top k vector records that closest to the given vector.
This function is a wrapper of BaseVectorStorage.query.
- Parameters:
vector (List[float]) – The search vector.
top_k (int) – The number of top similer vectors.
- Returns:
- A list of vector payloads retrieved
from the storage based on similarity to the query vector.
- Return type:
List[List[Dict[str, Any]]]
- abstract query(query: VectorDBQuery, **kwargs: Any) List[VectorDBQueryResult] [source]#
Searches for similar vectors in the storage based on the provided query.
- Parameters:
query (VectorDBQuery) – The query object containing the search vector and the number of top similar vectors to retrieve.
**kwargs (Any) – Additional keyword arguments.
- Returns:
- A list of vectors retrieved from the
storage based on similarity to the query vector.
- Return type:
List[VectorDBQueryResult]
- abstract status() VectorDBStatus [source]#
Returns status of the vector database.
- Returns:
The vector database status.
- Return type:
- class camel.storages.InMemoryKeyValueStorage[source]#
Bases:
BaseKeyValueStorage
A concrete implementation of the
BaseKeyValueStorage
using in-memory list. Ideal for temporary storage purposes, as data will be lost when the program ends.
- class camel.storages.JsonStorage(path: Path | None = None)[source]#
Bases:
BaseKeyValueStorage
A concrete implementation of the
BaseKeyValueStorage
using JSON files. Allows for persistent storage of records in a human-readable format.- Parameters:
path (Path, optional) – Path to the desired JSON file. If None, a default path ./chat_history.json will be used. (default:
None
)
- class camel.storages.MilvusStorage(vector_dim: int, url_and_api_key: Tuple[str, str], collection_name: str | None = None, **kwargs: Any)[source]#
Bases:
BaseVectorStorage
An implementation of the BaseVectorStorage for interacting with Milvus, a cloud-native vector search engine.
The detailed information about Milvus is available at: Milvus
- Parameters:
vector_dim (int) – The dimenstion of storing vectors.
url_and_api_key (Tuple[str, str]) – Tuple containing the URL and API key for connecting to a remote Milvus instance. URL maps to Milvus uri concept, typically “endpoint:port”. API key maps to Milvus token concept, for self-hosted it’s “username:pwd”, for Zilliz Cloud (fully-managed Milvus) it’s API Key.
collection_name (Optional[str], optional) – Name for the collection in the Milvus. If not provided, set it to the current time with iso format. (default:
None
)**kwargs (Any) – Additional keyword arguments for initializing MilvusClient.
- Raises:
ImportError – If pymilvus package is not installed.
- add(records: List[VectorRecord], **kwargs) None [source]#
Adds a list of vectors to the specified collection.
- Parameters:
records (List[VectorRecord]) – List of vectors to be added.
**kwargs (Any) – Additional keyword arguments pass to insert.
- Raises:
RuntimeError – If there was an error in the addition process.
- clear() None [source]#
Removes all vectors from the Milvus collection. This method deletes the existing collection and then recreates it with the same schema to effectively remove all stored vectors.
- property client: Any#
Provides direct access to the Milvus client. This property allows for direct interactions with the Milvus client for operations that are not covered by the MilvusStorage class.
- Returns:
The Milvus client instance.
- Return type:
Any
- delete(ids: List[str], **kwargs: Any) None [source]#
Deletes a list of vectors identified by their IDs from the storage. If unsure of ids you can first query the collection to grab the corresponding data.
- Parameters:
ids (List[str]) – List of unique identifiers for the vectors to be deleted.
**kwargs (Any) – Additional keyword arguments passed to delete.
- Raises:
RuntimeError – If there is an error during the deletion process.
- query(query: VectorDBQuery, **kwargs: Any) List[VectorDBQueryResult] [source]#
Searches for similar vectors in the storage based on the provided query.
- Parameters:
query (VectorDBQuery) – The query object containing the search vector and the number of top similar vectors to retrieve.
**kwargs (Any) – Additional keyword arguments passed to search.
- Returns:
- A list of vectors retrieved from the
storage based on similarity to the query vector.
- Return type:
List[VectorDBQueryResult]
- status() VectorDBStatus [source]#
Retrieves the current status of the Milvus collection. This method provides information about the collection, including its vector dimensionality and the total number of vectors stored.
- Returns:
- An object containing information about the
collection’s status.
- Return type:
- class camel.storages.NebulaGraph(host, username, password, space, port=9669, timeout=10000)[source]#
Bases:
BaseGraphStorage
- add_graph_elements(graph_elements: List[GraphElement]) None [source]#
Add graph elements (nodes and relationships) to the graph.
- Parameters:
graph_elements (List[GraphElement]) – A list of graph elements containing nodes and relationships.
- add_node(node_id: str, tag_name: str) None [source]#
Add a node with the specified tag and properties.
- Parameters:
node_id (str) – The ID of the node.
tag_name (str) – The tag name of the node.
- add_triplet(subj: str, obj: str, rel: str) None [source]#
Adds a relationship (triplet) between two entities in the Nebula Graph database.
- Parameters:
subj (str) – The identifier for the subject entity.
obj (str) – The identifier for the object entity.
rel (str) – The relationship between the subject and object.
- delete_entity(entity_id: str) None [source]#
Deletes an entity (vertex) from the graph.
- Parameters:
entity_id (str) – The identifier of the entity to be deleted.
- delete_triplet(subj: str, obj: str, rel: str) None [source]#
Deletes a specific triplet (relationship between two entities) from the Nebula Graph database.
- Parameters:
subj (str) – The identifier for the subject entity.
obj (str) – The identifier for the object entity.
rel (str) – The relationship between the subject and object.
- ensure_edge_type_exists(edge_type: str) None [source]#
Ensures that a specified edge type exists in the NebulaGraph database. If the edge type already exists, this method does nothing.
- Parameters:
edge_type (str) – The name of the edge type to be created.
- Raises:
Exception – If the edge type creation fails after multiple retry attempts, an exception is raised with the error message.
- ensure_tag_exists(tag_name: str) None [source]#
Ensures a tag is created in the NebulaGraph database. If the tag already exists, it does nothing.
- Parameters:
tag_name (str) – The name of the tag to be created.
- Raises:
Exception – If the tag creation fails after retries, an exception is raised with the error message.
- property get_client: Any#
Get the underlying graph storage client.
- get_indexes()[source]#
Fetches the tag indexes from the database.
- Returns:
A list of tag index names.
- Return type:
List[str]
- get_node_properties() Tuple[List[str], List[Dict[str, Any]]] [source]#
Retrieve node properties from the graph.
- Returns:
- A tuple where the first
element is a list of node schema properties, and the second element is a list of dictionaries representing node structures.
- Return type:
Tuple[List[str], List[Dict[str, Any]]]
- get_relationship_properties() Tuple[List[str], List[Dict[str, Any]]] [source]#
Retrieve relationship (edge) properties from the graph.
- Returns:
- A tuple where the first
element is a list of relationship schema properties, and the second element is a list of dictionaries representing relationship structures.
- Return type:
Tuple[List[str], List[Dict[str, Any]]]
- get_relationship_types() List[str] [source]#
Retrieve relationship types from the graph.
- Returns:
A list of relationship (edge) type names.
- Return type:
List[str]
- get_schema()[source]#
Generates a schema string describing node and relationship properties and relationships.
- Returns:
A string describing the schema.
- Return type:
str
- property get_structured_schema: Dict[str, Any]#
Generates a structured schema consisting of node and relationship properties, relationships, and metadata.
- Returns:
A dictionary representing the structured schema.
- Return type:
Dict[str, Any]
- class camel.storages.Neo4jGraph(url: str, username: str, password: str, database: str = 'neo4j', timeout: float | None = None, truncate: bool = False)[source]#
Bases:
BaseGraphStorage
Provides a connection to a Neo4j database for various graph operations.
The detailed information about Neo4j is available at: Neo4j https://neo4j.com/docs/getting-started
This module refered to the work of Langchian and Llamaindex.
- Parameters:
url (str) – The URL of the Neo4j database server.
username (str) – The username for database authentication.
password (str) – The password for database authentication.
database (str) – The name of the database to connect to. Defaults to neo4j.
timeout (Optional[float]) – The timeout for transactions in seconds. Useful for terminating long-running queries. Defaults to None.
truncate (bool) – A flag to indicate whether to remove lists with more than LIST_LIMIT elements from results. Defaults to False.
- add_graph_elements(graph_elements: List[GraphElement], include_source: bool = False, base_entity_label: bool = False) None [source]#
Adds nodes and relationships from a list of GraphElement objects to the graph storage.
- Parameters:
graph_elements (List[GraphElement]) – A list of GraphElement objects that contain the nodes and relationships to be added to the graph. Each GraphElement should encapsulate the structure of part of the graph, including nodes, relationships, and the source element information.
include_source (bool, optional) – If True, stores the source element and links it to nodes in the graph using the MENTIONS relationship. This is useful for tracing back the origin of data. Merges source elements based on the id property from the source element metadata if available; otherwise it calculates the MD5 hash of page_content for merging process. Defaults to False.
base_entity_label (bool, optional) – If True, each newly created node gets a secondary BASE_ENTITY_LABEL label, which is indexed and improves import speed and performance. Defaults to False.
- add_triplet(subj: str, obj: str, rel: str) None [source]#
Adds a relationship (triplet) between two entities in the database.
- Parameters:
subj (str) – The identifier for the subject entity.
obj (str) – The identifier for the object entity.
rel (str) – The relationship between the subject and object.
- delete_triplet(subj: str, obj: str, rel: str) None [source]#
Deletes a specific triplet from the graph, comprising a subject, object and relationship.
- Parameters:
subj (str) – The identifier for the subject entity.
obj (str) – The identifier for the object entity.
rel (str) – The relationship between the subject and object.
- property get_client: Any#
Get the underlying graph storage client.
- property get_schema: str#
Retrieve the schema of the Neo4jGraph store.
- Parameters:
refresh (bool) – A flag indicating whether to forcibly refresh the schema from the Neo4jGraph store regardless of whether it is already cached. Defaults to False.
- Returns:
The schema of the Neo4jGraph store.
- Return type:
str
- property get_structured_schema: Dict[str, Any]#
Returns the structured schema of the graph
- Returns:
The structured schema of the graph.
- Return type:
Dict[str, Any]
- query(query: str, params: Dict[str, Any] | None = None) List[Dict[str, Any]] [source]#
Executes a Neo4j Cypher declarative query in a database.
- Parameters:
query (str) – The Cypher query to be executed.
params (Optional[Dict[str, Any]]) – A dictionary of parameters to be used in the query. Defaults to None.
- Returns:
- A list of dictionaries, each
dictionary represents a row of results from the Cypher query.
- Return type:
List[Dict[str, Any]]
- Raises:
ValueError – If the executed Cypher query syntax is invalid.
- class camel.storages.QdrantStorage(vector_dim: int, collection_name: str | None = None, url_and_api_key: Tuple[str, str] | None = None, path: str | None = None, distance: VectorDistance = VectorDistance.COSINE, delete_collection_on_del: bool = False, **kwargs: Any)[source]#
Bases:
BaseVectorStorage
An implementation of the BaseVectorStorage for interacting with Qdrant, a vector search engine.
The detailed information about Qdrant is available at: Qdrant
- Parameters:
vector_dim (int) – The dimenstion of storing vectors.
collection_name (Optional[str], optional) – Name for the collection in the Qdrant. If not provided, set it to the current time with iso format. (default:
None
)url_and_api_key (Optional[Tuple[str, str]], optional) – Tuple containing the URL and API key for connecting to a remote Qdrant instance. (default:
None
)path (Optional[str], optional) – Path to a directory for initializing a local Qdrant client. (default:
None
)distance (VectorDistance, optional) – The distance metric for vector comparison (default:
VectorDistance.COSINE
)delete_collection_on_del (bool, optional) – Flag to determine if the collection should be deleted upon object destruction. (default:
False
)**kwargs (Any) – Additional keyword arguments for initializing QdrantClient.
Notes
If url_and_api_key is provided, it takes priority and the client will attempt to connect to the remote Qdrant instance using the URL endpoint.
If url_and_api_key is not provided and path is given, the client will use the local path to initialize Qdrant.
If neither url_and_api_key nor path is provided, the client will be initialized with an in-memory storage (“:memory:”).
- add(records: List[VectorRecord], **kwargs) None [source]#
Adds a list of vectors to the specified collection.
- Parameters:
vectors (List[VectorRecord]) – List of vectors to be added.
**kwargs (Any) – Additional keyword arguments.
- Raises:
RuntimeError – If there was an error in the addition process.
- property client: QdrantClient#
Provides access to the underlying vector database client.
- delete(ids: List[str] | None = None, payload_filter: Dict[str, Any] | None = None, **kwargs: Any) None [source]#
Deletes points from the collection based on either IDs or payload filters.
- Parameters:
ids (Optional[List[str]], optional) – List of unique identifiers for the vectors to be deleted.
payload_filter (Optional[Dict[str, Any]], optional) – A filter for the payload to delete points matching specific conditions. If ids is provided, payload_filter will be ignored unless both are combined explicitly.
**kwargs (Any) – Additional keyword arguments pass to QdrantClient. delete.
Examples
>>> # Delete points with IDs "1", "2", and "3" >>> storage.delete(ids=["1", "2", "3"]) >>> # Delete points with payload filter >>> storage.delete(payload_filter={"name": "Alice"})
- Raises:
ValueError – If neither ids nor payload_filter is provided.
RuntimeError – If there is an error during the deletion process.
Notes
- If ids is provided, the points with these IDs will be deleted
directly, and the payload_filter will be ignored.
- If ids is not provided but payload_filter is, then points
matching the payload_filter will be deleted.
- query(query: VectorDBQuery, filter_conditions: Dict[str, Any] | None = None, **kwargs: Any) List[VectorDBQueryResult] [source]#
Searches for similar vectors in the storage based on the provided query.
- Parameters:
query (VectorDBQuery) – The query object containing the search vector and the number of top similar vectors to retrieve.
filter_conditions (Optional[Dict[str, Any]], optional) – A dictionary specifying conditions to filter the query results.
**kwargs (Any) – Additional keyword arguments.
- Returns:
- A list of vectors retrieved from the
storage based on similarity to the query vector.
- Return type:
List[VectorDBQueryResult]
- status() VectorDBStatus [source]#
Returns status of the vector database.
- Returns:
The vector database status.
- Return type:
- update_payload(ids: List[str], payload: Dict[str, Any], **kwargs: Any) None [source]#
Updates the payload of the vectors identified by their IDs.
- Parameters:
ids (List[str]) – List of unique identifiers for the vectors to be updated.
payload (Dict[str, Any]) – List of payloads to be updated.
**kwargs (Any) – Additional keyword arguments.
- Raises:
RuntimeError – If there is an error during the update process.
- class camel.storages.RedisStorage(sid: str, url: str = 'redis://localhost:6379', loop: AbstractEventLoop | None = None, **kwargs)[source]#
Bases:
BaseKeyValueStorage
A concrete implementation of the
BaseCacheStorage
using Redis as the backend. This is suitable for distributed cache systems that require persistence and high availability.- property client: Redis | None#
Returns the Redis client instance.
- Returns:
The Redis client instance.
- Return type:
redis.asyncio.Redis
- class camel.storages.VectorDBQuery(query_vector: List[float], top_k: int)[source]#
Bases:
BaseModel
Represents a query to a vector database.
- query_vector#
The numerical representation of the query vector.
- Type:
List[float]
- top_k#
The number of top similar vectors to retrieve from the database. (default:
1
)- Type:
int, optional
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'query_vector': FieldInfo(annotation=List[float], required=True), 'top_k': FieldInfo(annotation=int, required=False, default=1)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- query_vector: List[float]#
The numerical representation of the query vector.
- top_k: int#
The number of top similar vectors to retrieve from the database.
- class camel.storages.VectorDBQueryResult(*, record: VectorRecord, similarity: float)[source]#
Bases:
BaseModel
Encapsulates the result of a query against a vector database.
- record#
The target vector record.
- Type:
- similarity#
The similarity score between the query vector and the record.
- Type:
float
- classmethod create(similarity: float, vector: List[float], id: str, payload: Dict[str, Any] | None = None) VectorDBQueryResult [source]#
A class method to construct a VectorDBQueryResult instance.
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'record': FieldInfo(annotation=VectorRecord, required=True), 'similarity': FieldInfo(annotation=float, required=True)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- record: VectorRecord#
- similarity: float#
- class camel.storages.VectorRecord(*, vector: List[float], id: str = None, payload: Dict[str, Any] | None = None)[source]#
Bases:
BaseModel
Encapsulates information about a vector’s unique identifier and its payload, which is primarily used as a data transfer object when saving to vector storage.
- vector#
The numerical representation of the vector.
- Type:
List[float]
- id#
A unique identifier for the vector. If not provided, an random uuid will be assigned.
- Type:
str, optional
- payload#
Any additional metadata or information related to the vector. (default:
None
)- Type:
Optional[Dict[str, Any]], optional
- id: str#
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'id': FieldInfo(annotation=str, required=False, default_factory=<lambda>), 'payload': FieldInfo(annotation=Union[Dict[str, Any], NoneType], required=False, default=None), 'vector': FieldInfo(annotation=List[float], required=True)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- payload: Dict[str, Any] | None#
- vector: List[float]#