Source code for camel.toolkits.data_commons_toolkit
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import logging
from typing import Any, Dict, List, Optional, Union
from camel.toolkits import FunctionTool
from camel.toolkits.base import BaseToolkit
from camel.utils import MCPServer
logger = logging.getLogger(__name__)
[docs]
@MCPServer()
class DataCommonsToolkit(BaseToolkit):
r"""A class representing a toolkit for Data Commons.
This class provides methods for querying and retrieving data from the
Data Commons knowledge graph. It includes functionality for:
- Executing SPARQL queries
- Retrieving triples associated with nodes
- Fetching statistical time series data
- Analyzing property labels and values
- Retrieving places within a given place type
- Obtaining statistical values for specific variables and locations
All the data are grabbed from the knowledge graph of Data Commons.
Refer to https://datacommons.org/browser/ for more details.
"""
def __init__(self, timeout: Optional[float] = None):
r"""Initialize the DataCommonsToolkit.
Args:
timeout (Optional[float], optional): Maximum time in seconds to
wait for API calls to complete. If None, will wait indefinitely.
(default: :obj:`None`)
"""
super().__init__(timeout=timeout)
[docs]
def query_data_commons(
self,
query_string: str,
) -> Optional[List[Dict[str, Any]]]:
r"""Query the Data Commons knowledge graph using SPARQL.
Args:
query_string (str): A SPARQL query string.
Returns:
Optional[List[Dict[str, Any]]]: A list of dictionaries, each
representing a node matching the query conditions if success,
(default: :obj:`None`) otherwise.
Note:
- Only supports a limited subset of SPARQL functionality (ORDER BY,
DISTINCT, LIMIT).
- Each variable in the query should have a 'typeOf' condition.
- The Python SPARQL library currently only supports the V1 version
of the API.
Reference:
https://docs.datacommons.org/api/python/query.html
"""
import datacommons
try:
results = datacommons.query(query_string)
processed_results = [
{key: value for key, value in row.items()} for row in results
]
return processed_results
except Exception as e:
logger.error(
f"An error occurred while querying Data Commons: {e!s}"
)
return None
[docs]
def get_triples(
self, dcids: Union[str, List[str]], limit: int = 500
) -> Optional[Dict[str, List[tuple]]]:
r"""Retrieve triples associated with nodes.
Args:
dcids (Union[str, List[str]]): A single DCID or a list of DCIDs
to query.
limit (int): The maximum number of triples per
combination of property and type. (default: :obj:`500`)
Returns:
Optional[Dict[str, List[tuple]]]: A dictionary where keys are
DCIDs and values are lists of associated triples if success,
(default: :obj:`None`) otherwise.
Note:
- The function will raise a ValueError if any of the required
arguments are missing.
- The function will raise a TypeError if the dcids are not a string
or a list of strings.
- The function will raise a ValueError if the limit is not between
1 and 500.
- The function will raise a KeyError if one or more of the provided
DCIDs do not exist in the Data Commons knowledge graph.
- The function will raise an Exception if an unexpected error occurs.
Reference:
https://docs.datacommons.org/api/python/triple.html
"""
import datacommons
try:
result = datacommons.get_triples(dcids, limit)
return result
except Exception as e:
logger.error(f"An error occurred: {e!s}")
return None
[docs]
def get_stat_time_series(
self,
place: str,
stat_var: str,
measurement_method: Optional[str] = None,
observation_period: Optional[str] = None,
unit: Optional[str] = None,
scaling_factor: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
r"""Retrieve statistical time series for a place.
Args:
place (str): The dcid of the Place to query for.
stat_var (str): The dcid of the StatisticalVariable.
measurement_method (str, optional): The technique used for
measuring a statistical variable. (default: :obj:`None`)
observation_period (str, optional): The time period over which an
observation is made. (default: :obj:`None`)
scaling_factor (str, optional): Property of statistical variables
indicating factor by which a measurement is multiplied to fit
a certain format. (default: :obj:`None`)
unit (str, optional): The unit of measurement. (default:
:obj:`None`)
Returns:
Optional[Dict[str, Any]]: A dictionary containing the statistical
time series data if success, (default: :obj:`None`) otherwise.
Reference:
https://docs.datacommons.org/api/python/stat_series.html
"""
import datacommons_pandas
try:
result = datacommons_pandas.get_stat_series(
place,
stat_var,
measurement_method,
observation_period,
unit,
scaling_factor,
)
return result
except Exception as e:
logger.error(
f"An error occurred while querying Data Commons: {e!s}"
)
return None
[docs]
def get_property_labels(
self, dcids: Union[str, List[str]], out: bool = True
) -> Optional[Dict[str, List[str]]]:
r"""Retrieves and analyzes property labels for given DCIDs.
Args:
dcids (list): A list of Data Commons IDs (DCIDs) to analyze.
out (bool): Direction of properties to retrieve. (default:
:obj:`True`)
Returns:
Optional[Dict[str, List[str]]]: Analysis results for each DCID if
success, (default: :obj:`None`) otherwise.
Reference:
https://docs.datacommons.org/api/python/property_label.html
"""
import datacommons
try:
result = datacommons.get_property_labels(dcids, out=out)
return result
except Exception as e:
logger.error(
f"An error occurred while analyzing property labels: {e!s}"
)
return None
[docs]
def get_property_values(
self,
dcids: Union[str, List[str]],
prop: str,
out: Optional[bool] = True,
value_type: Optional[str] = None,
limit: Optional[int] = None,
) -> Optional[Dict[str, Any]]:
r"""Retrieves and analyzes property values for given DCIDs.
Args:
dcids (list): A list of Data Commons IDs (DCIDs) to analyze.
prop (str): The property to analyze.
value_type (str, optional): The type of the property value to
filter by. Defaults to NONE. Only applicable if the value
refers to a node.
out (bool, optional): The label's direction. (default: :obj:`True`)
(only returning response nodes directed towards the requested
node). If set to False, will only return response nodes
directed away from the request node. (default: :obj:`None`)
limit (int, optional): (≤ 500) Maximum number of values returned
per node. (default: :obj:`datacommons.utils._MAX_LIMIT`)
Returns:
Optional[Dict[str, Any]]: Analysis results for each DCID if
success, (default: :obj:`None`) otherwise.
Reference:
https://docs.datacommons.org/api/python/property_value.html
"""
import datacommons
try:
result = datacommons.get_property_values(
dcids, prop, out, value_type, limit
)
return result
except Exception as e:
logger.error(
f"An error occurred while analyzing property values: {e!s}"
)
return None
[docs]
def get_places_in(
self, dcids: list, place_type: str
) -> Optional[Dict[str, Any]]:
r"""Retrieves places within a given place type.
Args:
dcids (list): A list of Data Commons IDs (DCIDs) to analyze.
place_type (str): The type of the place to filter by.
Returns:
Optional[Dict[str, Any]]: Analysis results for each DCID if
success, (default: :obj:`None`) otherwise.
Reference:
https://docs.datacommons.org/api/python/place_in.html
"""
import datacommons
try:
result = datacommons.get_places_in(dcids, place_type)
return result
except Exception as e:
logger.error(
"An error occurred while retrieving places in a given place "
f"type: {e!s}"
)
return None
[docs]
def get_stat_value(
self,
place: str,
stat_var: str,
date: Optional[str] = None,
measurement_method: Optional[str] = None,
observation_period: Optional[str] = None,
unit: Optional[str] = None,
scaling_factor: Optional[str] = None,
) -> Optional[float]:
r"""Retrieves the value of a statistical variable for a given place
and date.
Args:
place (str): The DCID of the Place to query for.
stat_var (str): The DCID of the StatisticalVariable.
date (str, optional): The preferred date of observation in ISO
8601 format. If not specified, returns the latest observation.
(default: :obj:`None`)
measurement_method (str, optional): The DCID of the preferred
measurementMethod value. (default: :obj:`None`)
observation_period (str, optional): The preferred observationPeriod
value. (default: :obj:`None`)
unit (str, optional): The DCID of the preferred unit value.
(default: :obj:`None`)
scaling_factor (str, optional): The preferred scalingFactor value.
(default: :obj:`None`)
Returns:
Optional[float]: The value of the statistical variable for the
given place and date if success, (default: :obj:`None`)
otherwise.
Reference:
https://docs.datacommons.org/api/python/stat_value.html
"""
import datacommons
try:
result = datacommons.get_stat_value(
place,
stat_var,
date,
measurement_method,
observation_period,
unit,
scaling_factor,
)
return result
except Exception as e:
logger.error(
"An error occurred while retrieving the value of a "
f"statistical variable: {e!s}"
)
return None
[docs]
def get_stat_all(self, places: str, stat_vars: str) -> Optional[dict]:
r"""Retrieves the value of a statistical variable for a given place
and date.
Args:
places (str): The DCID IDs of the Place objects to query for.
(Here DCID stands for Data Commons ID, the unique identifier
assigned to all entities in Data Commons.)
stat_vars (str): The dcids of the StatisticalVariables at
https://datacommons.org/browser/StatisticalVariable
Returns:
Optional[dict]: A dictionary with the DCID of the place as the key
and a list of tuples as the value if success, (default:
:obj:`None`) otherwise.
Reference:
https://docs.datacommons.org/api/python/stat_all.html
"""
import datacommons
try:
result = datacommons.get_stat_all(places, stat_vars)
return result
except Exception as e:
logger.error(
"An error occurred while retrieving the value of a "
f"statistical variable: {e!s}"
)
return None
[docs]
def get_tools(self) -> List[FunctionTool]:
r"""Returns a list of FunctionTool objects representing the functions
in the toolkit.
Returns:
List[FunctionTool]: A list of FunctionTool objects representing
the functions in the toolkit.
"""
return [
FunctionTool(self.query_data_commons),
FunctionTool(self.get_triples),
FunctionTool(self.get_stat_time_series),
FunctionTool(self.get_property_labels),
FunctionTool(self.get_property_values),
FunctionTool(self.get_places_in),
FunctionTool(self.get_stat_value),
FunctionTool(self.get_stat_all),
]