camel.storages.graph_storages package

On this page

camel.storages.graph_storages package#

Submodules#

camel.storages.graph_storages.base module#

class camel.storages.graph_storages.base.BaseGraphStorage[source]#

Bases: ABC

An abstract base class for graph storage systems.

abstract add_triplet(subj: str, obj: str, rel: str) None[source]#

Adds a relationship (triplet) between two entities in the database.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

abstract delete_triplet(subj: str, obj: str, rel: str) None[source]#

Deletes a specific triplet from the graph, comprising a subject, object and relationship.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

abstract property get_client: Any#

Get the underlying graph storage client.

abstract property get_schema: str#

Get the schema of the graph storage

abstract property get_structured_schema: Dict[str, Any]#

Get the structured schema of the graph storage

abstract query(query: str, params: Dict[str, Any] | None = None) List[Dict[str, Any]][source]#

Query the graph store with statement and parameters.

Parameters:
  • query (str) – The query to be executed.

  • params (Optional[Dict[str, Any]]) – A dictionary of parameters to be used in the query. Defaults to None.

Returns:

A list of dictionaries, each

dictionary represents a row of results from the query.

Return type:

List[Dict[str, Any]]

abstract refresh_schema() None[source]#

Refreshes the graph schema information.

camel.storages.graph_storages.graph_element module#

class camel.storages.graph_storages.graph_element.GraphElement(*, nodes: List[Node], relationships: List[Relationship], source: Element)[source]#

Bases: BaseModel

A graph element with lists of nodes and relationships.

nodes#

A list of nodes in the graph.

Type:

List[Node]

relationships#

A list of relationships in the graph.

Type:

List[Relationship]

source#

The element from which the graph information is derived.

Type:

Element

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'nodes': FieldInfo(annotation=List[Node], required=True), 'relationships': FieldInfo(annotation=List[Relationship], required=True), 'source': FieldInfo(annotation=Element, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

nodes: List[Node]#
relationships: List[Relationship]#
source: Element#
class camel.storages.graph_storages.graph_element.Node(*, id: str | int, type: str = 'Node', properties: dict = None)[source]#

Bases: BaseModel

Represents a node in a graph with associated properties.

id#

A unique identifier for the node.

Type:

Union[str, int]

type#

The type of the relationship.

Type:

str

properties#

Additional properties and metadata associated with the node.

Type:

dict

id: str | int#
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'id': FieldInfo(annotation=Union[str, int], required=True), 'properties': FieldInfo(annotation=dict, required=False, default_factory=dict), 'type': FieldInfo(annotation=str, required=False, default='Node')}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

properties: dict#
type: str#
class camel.storages.graph_storages.graph_element.Relationship(*, subj: Node, obj: Node, type: str = 'Relationship', properties: dict = None)[source]#

Bases: BaseModel

Represents a directed relationship between two nodes in a graph.

subj#

The subject/source node of the relationship.

Type:

Node

obj#

The object/target node of the relationship.

Type:

Node

type#

The type of the relationship.

Type:

str

properties#

Additional properties associated with the relationship.

Type:

dict

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'obj': FieldInfo(annotation=Node, required=True), 'properties': FieldInfo(annotation=dict, required=False, default_factory=dict), 'subj': FieldInfo(annotation=Node, required=True), 'type': FieldInfo(annotation=str, required=False, default='Relationship')}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

obj: Node#
properties: dict#
subj: Node#
type: str#

camel.storages.graph_storages.neo4j_graph module#

class camel.storages.graph_storages.neo4j_graph.Neo4jGraph(url: str, username: str, password: str, database: str = 'neo4j', timeout: float | None = None, truncate: bool = False)[source]#

Bases: BaseGraphStorage

Provides a connection to a Neo4j database for various graph operations.

The detailed information about Neo4j is available at: Neo4j https://neo4j.com/docs/getting-started

This module refered to the work of Langchian and Llamaindex.

Parameters:
  • url (str) – The URL of the Neo4j database server.

  • username (str) – The username for database authentication.

  • password (str) – The password for database authentication.

  • database (str) – The name of the database to connect to. Defaults to neo4j.

  • timeout (Optional[float]) – The timeout for transactions in seconds. Useful for terminating long-running queries. Defaults to None.

  • truncate (bool) – A flag to indicate whether to remove lists with more than LIST_LIMIT elements from results. Defaults to False.

add_graph_elements(graph_elements: List[GraphElement], include_source: bool = False, base_entity_label: bool = False) None[source]#

Adds nodes and relationships from a list of GraphElement objects to the graph storage.

Parameters:
  • graph_elements (List[GraphElement]) – A list of GraphElement objects that contain the nodes and relationships to be added to the graph. Each GraphElement should encapsulate the structure of part of the graph, including nodes, relationships, and the source element information.

  • include_source (bool, optional) – If True, stores the source element and links it to nodes in the graph using the MENTIONS relationship. This is useful for tracing back the origin of data. Merges source elements based on the id property from the source element metadata if available; otherwise it calculates the MD5 hash of page_content for merging process. Defaults to False.

  • base_entity_label (bool, optional) – If True, each newly created node gets a secondary BASE_ENTITY_LABEL label, which is indexed and improves import speed and performance. Defaults to False.

add_triplet(subj: str, obj: str, rel: str) None[source]#

Adds a relationship (triplet) between two entities in the database.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

common_neighbour_aware_random_walk(graph_name: str, sampling_ratio: float, start_node_ids: List[int], node_label_stratification: bool = False, relationship_weight_property: str | None = None) Dict[str, Any][source]#

Runs the Common Neighbour Aware Random Walk (CNARW) sampling algorithm.

Parameters:
  • graph_name (str) – The name of the original graph in the graph catalog.

  • sampling_ratio (float) – The fraction of nodes in the original graph to be sampled.

  • start_node_ids (List[int]) – IDs of the initial set of nodes of the original graph from which the sampling random walks will start.

  • node_label_stratification (bool, optional) – If true, preserves the node label distribution of the original graph. Defaults to False.

  • relationship_weight_property (Optional[str], optional) – Name of the relationship property to use as weights. If unspecified, the algorithm runs unweighted. Defaults to None.

Returns:

A dictionary with the results of the CNARW

sampling.

Return type:

Dict[str, Any]

delete_triplet(subj: str, obj: str, rel: str) None[source]#

Deletes a specific triplet from the graph, comprising a subject, object and relationship.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

property get_client: Any#

Get the underlying graph storage client.

property get_schema: str#

Retrieve the schema of the Neo4jGraph store.

Parameters:

refresh (bool) – A flag indicating whether to forcibly refresh the schema from the Neo4jGraph store regardless of whether it is already cached. Defaults to False.

Returns:

The schema of the Neo4jGraph store.

Return type:

str

property get_structured_schema: Dict[str, Any]#

Returns the structured schema of the graph

Returns:

The structured schema of the graph.

Return type:

Dict[str, Any]

query(query: str, params: Dict[str, Any] | None = None) List[Dict[str, Any]][source]#

Executes a Neo4j Cypher declarative query in a database.

Parameters:
  • query (str) – The Cypher query to be executed.

  • params (Optional[Dict[str, Any]]) – A dictionary of parameters to be used in the query. Defaults to None.

Returns:

A list of dictionaries, each

dictionary represents a row of results from the Cypher query.

Return type:

List[Dict[str, Any]]

Raises:

ValueError – If the executed Cypher query syntax is invalid.

random_walk_with_restarts(graph_name: str, sampling_ratio: float, start_node_ids: List[int], restart_probability: float = 0.1, node_label_stratification: bool = False, relationship_weight_property: str | None = None) Dict[str, Any][source]#

Runs the Random Walk with Restarts (RWR) sampling algorithm.

Parameters:
  • graph_name (str) – The name of the original graph in the graph catalog.

  • sampling_ratio (float) – The fraction of nodes in the original graph to be sampled.

  • start_node_ids (List[int]) – IDs of the initial set of nodes of the original graph from which the sampling random walks will start.

  • restart_probability (float, optional) – The probability that a sampling random walk restarts from one of the start nodes. Defaults to 0.1.

  • node_label_stratification (bool, optional) – If true, preserves the node label distribution of the original graph. Defaults to False.

  • relationship_weight_property (Optional[str], optional) – Name of the relationship property to use as weights. If unspecified, the algorithm runs unweighted. Defaults to None.

Returns:

A dictionary with the results of the RWR sampling.

Return type:

Dict[str, Any]

refresh_schema() None[source]#

Refreshes the Neo4j graph schema information by querying the database for node properties, relationship properties, and relationships.

Module contents#

class camel.storages.graph_storages.BaseGraphStorage[source]#

Bases: ABC

An abstract base class for graph storage systems.

abstract add_triplet(subj: str, obj: str, rel: str) None[source]#

Adds a relationship (triplet) between two entities in the database.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

abstract delete_triplet(subj: str, obj: str, rel: str) None[source]#

Deletes a specific triplet from the graph, comprising a subject, object and relationship.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

abstract property get_client: Any#

Get the underlying graph storage client.

abstract property get_schema: str#

Get the schema of the graph storage

abstract property get_structured_schema: Dict[str, Any]#

Get the structured schema of the graph storage

abstract query(query: str, params: Dict[str, Any] | None = None) List[Dict[str, Any]][source]#

Query the graph store with statement and parameters.

Parameters:
  • query (str) – The query to be executed.

  • params (Optional[Dict[str, Any]]) – A dictionary of parameters to be used in the query. Defaults to None.

Returns:

A list of dictionaries, each

dictionary represents a row of results from the query.

Return type:

List[Dict[str, Any]]

abstract refresh_schema() None[source]#

Refreshes the graph schema information.

class camel.storages.graph_storages.GraphElement(*, nodes: List[Node], relationships: List[Relationship], source: Element)[source]#

Bases: BaseModel

A graph element with lists of nodes and relationships.

nodes#

A list of nodes in the graph.

Type:

List[Node]

relationships#

A list of relationships in the graph.

Type:

List[Relationship]

source#

The element from which the graph information is derived.

Type:

Element

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'nodes': FieldInfo(annotation=List[Node], required=True), 'relationships': FieldInfo(annotation=List[Relationship], required=True), 'source': FieldInfo(annotation=Element, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

nodes: List[Node]#
relationships: List[Relationship]#
source: Element#
class camel.storages.graph_storages.NebulaGraph(host, username, password, space, port=9669, timeout=10000)[source]#

Bases: BaseGraphStorage

add_graph_elements(graph_elements: List[GraphElement]) None[source]#

Add graph elements (nodes and relationships) to the graph.

Parameters:

graph_elements (List[GraphElement]) – A list of graph elements containing nodes and relationships.

add_node(node_id: str, tag_name: str, time_label: str | None = None) None[source]#

Add a node with the specified tag and properties.

Parameters:
  • node_id (str) – The ID of the node.

  • tag_name (str) – The tag name of the node.

  • time_label (str, optional) – A specific timestamp to set for the node’s time label property. If not provided, no timestamp will be added. (default: None)

add_triplet(subj: str, obj: str, rel: str, time_label: str | None = None) None[source]#

Adds a relationship (triplet) between two entities in the Nebula Graph database.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

  • time_label (str, optional) – A specific timestamp to set for the time label property of the relationship. If not provided, no timestamp will be added. (default: None)

Raises:
  • ValueError – If the time_label format is invalid.

  • Exception – If creating the relationship fails.

delete_entity(entity_id: str) None[source]#

Deletes an entity (vertex) from the graph.

Parameters:

entity_id (str) – The identifier of the entity to be deleted.

delete_triplet(subj: str, obj: str, rel: str) None[source]#

Deletes a specific triplet (relationship between two entities) from the Nebula Graph database.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

ensure_edge_type_exists(edge_type: str, time_label: str | None = None) None[source]#

Ensures that a specified edge type exists in the NebulaGraph database. If the edge type already exists, this method does nothing.

Parameters:
  • edge_type (str) – The name of the edge type to be created.

  • time_label (str, optional) – A specific timestamp to set as the default value for the time label property. If not provided, no timestamp will be added. (default: None)

Raises:

Exception – If the edge type creation fails after multiple retry attempts, an exception is raised with the error message.

ensure_tag_exists(tag_name: str, time_label: str | None = None) None[source]#

Ensures a tag is created in the NebulaGraph database. If the tag already exists, it does nothing.

Parameters:
  • tag_name (str) – The name of the tag to be created.

  • time_label (str, optional) – A specific timestamp to set as the default value for the time label property. If not provided, no timestamp will be added. (default: None)

Raises:

Exception – If the tag creation fails after retries, an exception is raised with the error message.

property get_client: Any#

Get the underlying graph storage client.

get_indexes()[source]#

Fetches the tag indexes from the database.

Returns:

A list of tag index names.

Return type:

List[str]

get_node_properties() Tuple[List[str], List[Dict[str, Any]]][source]#

Retrieve node properties from the graph.

Returns:

A tuple where the first

element is a list of node schema properties, and the second element is a list of dictionaries representing node structures.

Return type:

Tuple[List[str], List[Dict[str, Any]]]

get_relationship_properties() Tuple[List[str], List[Dict[str, Any]]][source]#

Retrieve relationship (edge) properties from the graph.

Returns:

A tuple where the first

element is a list of relationship schema properties, and the second element is a list of dictionaries representing relationship structures.

Return type:

Tuple[List[str], List[Dict[str, Any]]]

get_relationship_types() List[str][source]#

Retrieve relationship types from the graph.

Returns:

A list of relationship (edge) type names.

Return type:

List[str]

get_schema()[source]#

Generates a schema string describing node and relationship properties and relationships.

Returns:

A string describing the schema.

Return type:

str

property get_structured_schema: Dict[str, Any]#

Generates a structured schema consisting of node and relationship properties, relationships, and metadata, including timestamps.

Returns:

A dictionary representing the structured schema.

Return type:

Dict[str, Any]

query(query: str) ResultSet[source]#

Execute a query on the graph store.

Parameters:

query (str) – The Cypher-like query to be executed.

Returns:

The result set of the query execution.

Return type:

ResultSet

Raises:

ValueError – If the query execution fails.

refresh_schema() None[source]#

Refreshes the schema by fetching the latest schema details.

class camel.storages.graph_storages.Neo4jGraph(url: str, username: str, password: str, database: str = 'neo4j', timeout: float | None = None, truncate: bool = False)[source]#

Bases: BaseGraphStorage

Provides a connection to a Neo4j database for various graph operations.

The detailed information about Neo4j is available at: Neo4j https://neo4j.com/docs/getting-started

This module refered to the work of Langchian and Llamaindex.

Parameters:
  • url (str) – The URL of the Neo4j database server.

  • username (str) – The username for database authentication.

  • password (str) – The password for database authentication.

  • database (str) – The name of the database to connect to. Defaults to neo4j.

  • timeout (Optional[float]) – The timeout for transactions in seconds. Useful for terminating long-running queries. Defaults to None.

  • truncate (bool) – A flag to indicate whether to remove lists with more than LIST_LIMIT elements from results. Defaults to False.

add_graph_elements(graph_elements: List[GraphElement], include_source: bool = False, base_entity_label: bool = False) None[source]#

Adds nodes and relationships from a list of GraphElement objects to the graph storage.

Parameters:
  • graph_elements (List[GraphElement]) – A list of GraphElement objects that contain the nodes and relationships to be added to the graph. Each GraphElement should encapsulate the structure of part of the graph, including nodes, relationships, and the source element information.

  • include_source (bool, optional) – If True, stores the source element and links it to nodes in the graph using the MENTIONS relationship. This is useful for tracing back the origin of data. Merges source elements based on the id property from the source element metadata if available; otherwise it calculates the MD5 hash of page_content for merging process. Defaults to False.

  • base_entity_label (bool, optional) – If True, each newly created node gets a secondary BASE_ENTITY_LABEL label, which is indexed and improves import speed and performance. Defaults to False.

add_triplet(subj: str, obj: str, rel: str) None[source]#

Adds a relationship (triplet) between two entities in the database.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

common_neighbour_aware_random_walk(graph_name: str, sampling_ratio: float, start_node_ids: List[int], node_label_stratification: bool = False, relationship_weight_property: str | None = None) Dict[str, Any][source]#

Runs the Common Neighbour Aware Random Walk (CNARW) sampling algorithm.

Parameters:
  • graph_name (str) – The name of the original graph in the graph catalog.

  • sampling_ratio (float) – The fraction of nodes in the original graph to be sampled.

  • start_node_ids (List[int]) – IDs of the initial set of nodes of the original graph from which the sampling random walks will start.

  • node_label_stratification (bool, optional) – If true, preserves the node label distribution of the original graph. Defaults to False.

  • relationship_weight_property (Optional[str], optional) – Name of the relationship property to use as weights. If unspecified, the algorithm runs unweighted. Defaults to None.

Returns:

A dictionary with the results of the CNARW

sampling.

Return type:

Dict[str, Any]

delete_triplet(subj: str, obj: str, rel: str) None[source]#

Deletes a specific triplet from the graph, comprising a subject, object and relationship.

Parameters:
  • subj (str) – The identifier for the subject entity.

  • obj (str) – The identifier for the object entity.

  • rel (str) – The relationship between the subject and object.

property get_client: Any#

Get the underlying graph storage client.

property get_schema: str#

Retrieve the schema of the Neo4jGraph store.

Parameters:

refresh (bool) – A flag indicating whether to forcibly refresh the schema from the Neo4jGraph store regardless of whether it is already cached. Defaults to False.

Returns:

The schema of the Neo4jGraph store.

Return type:

str

property get_structured_schema: Dict[str, Any]#

Returns the structured schema of the graph

Returns:

The structured schema of the graph.

Return type:

Dict[str, Any]

query(query: str, params: Dict[str, Any] | None = None) List[Dict[str, Any]][source]#

Executes a Neo4j Cypher declarative query in a database.

Parameters:
  • query (str) – The Cypher query to be executed.

  • params (Optional[Dict[str, Any]]) – A dictionary of parameters to be used in the query. Defaults to None.

Returns:

A list of dictionaries, each

dictionary represents a row of results from the Cypher query.

Return type:

List[Dict[str, Any]]

Raises:

ValueError – If the executed Cypher query syntax is invalid.

random_walk_with_restarts(graph_name: str, sampling_ratio: float, start_node_ids: List[int], restart_probability: float = 0.1, node_label_stratification: bool = False, relationship_weight_property: str | None = None) Dict[str, Any][source]#

Runs the Random Walk with Restarts (RWR) sampling algorithm.

Parameters:
  • graph_name (str) – The name of the original graph in the graph catalog.

  • sampling_ratio (float) – The fraction of nodes in the original graph to be sampled.

  • start_node_ids (List[int]) – IDs of the initial set of nodes of the original graph from which the sampling random walks will start.

  • restart_probability (float, optional) – The probability that a sampling random walk restarts from one of the start nodes. Defaults to 0.1.

  • node_label_stratification (bool, optional) – If true, preserves the node label distribution of the original graph. Defaults to False.

  • relationship_weight_property (Optional[str], optional) – Name of the relationship property to use as weights. If unspecified, the algorithm runs unweighted. Defaults to None.

Returns:

A dictionary with the results of the RWR sampling.

Return type:

Dict[str, Any]

refresh_schema() None[source]#

Refreshes the Neo4j graph schema information by querying the database for node properties, relationship properties, and relationships.