Source code for camel.toolkits.notion_toolkit
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import os
from typing import List, Optional, cast
from camel.toolkits import FunctionTool
from camel.toolkits.base import BaseToolkit
def get_plain_text_from_rich_text(rich_text: List[dict]) -> str:
r"""Extracts plain text from a list of rich text elements.
Args:
rich_text: A list of dictionaries representing rich text elements.
Each dictionary should contain a key named "plain_text" with
the plain text content.
Returns:
str: A string containing the combined plain text from all elements,
joined together.
"""
plain_texts = [element.get("plain_text", "") for element in rich_text]
return "".join(plain_texts)
def get_media_source_text(block: dict) -> str:
r"""Extracts the source URL and optional caption from a
Notion media block.
Args:
block: A dictionary representing a Notion media block.
Returns:
A string containing the source URL and caption (if available),
separated by a colon.
"""
block_type = block.get("type", "Unknown Type")
block_content = block.get(block_type, {})
# Extract source URL based on available types
source = (
block_content.get("external", {}).get("url")
or block_content.get("file", {}).get("url")
or block_content.get(
"url", "[Missing case for media block types]: " + block_type
)
)
# Extract caption if available
caption_elements = block_content.get("caption", [])
if caption_elements:
caption = get_plain_text_from_rich_text(caption_elements)
return f"{caption}: {source}"
return source
[docs]
class NotionToolkit(BaseToolkit):
r"""A toolkit for retrieving information from the user's notion pages.
Attributes:
notion_token (Optional[str], optional): The notion_token used to
interact with notion APIs. (default: :obj:`None`)
notion_client (module): The notion module for interacting with
the notion APIs.
"""
def __init__(
self,
notion_token: Optional[str] = None,
) -> None:
r"""Initializes the NotionToolkit.
Args:
notion_token (Optional[str], optional): The optional notion_token
used to interact with notion APIs.(default: :obj:`None`)
"""
from notion_client import Client
self.notion_token = notion_token or os.environ.get("NOTION_TOKEN")
self.notion_client = Client(auth=self.notion_token)
[docs]
def list_all_users(self) -> List[dict]:
r"""Lists all users via the Notion integration.
Returns:
List[dict]: A list of user objects with type, name, and workspace.
"""
all_users_info: List[dict] = []
cursor = None
while True:
response = cast(
dict,
self.notion_client.users.list(start_cursor=cursor),
)
all_users_info.extend(response["results"])
if not response["has_more"]:
break
cursor = response["next_cursor"]
formatted_users = [
{
"type": user["type"],
"name": user["name"],
"workspace": user.get(user.get("type"), {}).get(
"workspace_name", ""
),
}
for user in all_users_info
]
return formatted_users
[docs]
def list_all_pages(self) -> List[dict]:
r"""Lists all pages in the Notion workspace.
Returns:
List[dict]: A list of page objects with title and id.
"""
all_pages_info: List[dict] = []
cursor = None
while True:
response = cast(
dict,
self.notion_client.search(
filter={"property": "object", "value": "page"},
start_cursor=cursor,
),
)
all_pages_info.extend(response["results"])
if not response["has_more"]:
break
cursor = response["next_cursor"]
formatted_pages = [
{
"id": page.get("id"),
"title": next(
(
title.get("text", {}).get("content")
for title in page["properties"]
.get("title", {})
.get("title", [])
if title["type"] == "text"
),
None,
),
}
for page in all_pages_info
]
return formatted_pages
[docs]
def get_notion_block_text_content(self, block_id: str) -> str:
r"""Retrieves the text content of a Notion block.
Args:
block_id (str): The ID of the Notion block to retrieve.
Returns:
str: The text content of a Notion block, containing all
the sub blocks.
"""
blocks: List[dict] = []
cursor = None
while True:
response = cast(
dict,
self.notion_client.blocks.children.list(
block_id=block_id, start_cursor=cursor
),
)
blocks.extend(response["results"])
if not response["has_more"]:
break
cursor = response["next_cursor"]
block_text_content = " ".join(
[self.get_text_from_block(sub_block) for sub_block in blocks]
)
return block_text_content
[docs]
def get_text_from_block(self, block: dict) -> str:
r"""Extracts plain text from a Notion block based on its type.
Args:
block (dict): A dictionary representing a Notion block.
Returns:
str: A string containing the extracted plain text and block type.
"""
# Get rich text for supported block types
if block.get(block.get("type"), {}).get("rich_text"):
# Empty string if it's an empty line
text = get_plain_text_from_rich_text(
block[block["type"]]["rich_text"]
)
else:
# Handle block types by case
block_type = block.get("type")
if block_type == "unsupported":
text = "[Unsupported block type]"
elif block_type == "bookmark":
text = block["bookmark"]["url"]
elif block_type == "child_database":
text = block["child_database"]["title"]
# Use other API endpoints for full database data
elif block_type == "child_page":
text = block["child_page"]["title"]
elif block_type in ("embed", "video", "file", "image", "pdf"):
text = get_media_source_text(block)
elif block_type == "equation":
text = block["equation"]["expression"]
elif block_type == "link_preview":
text = block["link_preview"]["url"]
elif block_type == "synced_block":
if block["synced_block"].get("synced_from"):
text = (
f"This block is synced with a block with ID: "
f"""
{block['synced_block']['synced_from']
[block['synced_block']['synced_from']['type']]}
"""
)
else:
text = (
"Source sync block that another"
+ "blocked is synced with."
)
elif block_type == "table":
text = f"Table width: {block['table']['table_width']}"
# Fetch children for full table data
elif block_type == "table_of_contents":
text = f"ToC color: {block['table_of_contents']['color']}"
elif block_type in ("breadcrumb", "column_list", "divider"):
text = "No text available"
else:
text = "[Needs case added]"
# Query children for blocks with children
if block.get("has_children"):
text += self.get_notion_block_text_content(block["id"])
return text
[docs]
def get_tools(self) -> List[FunctionTool]:
r"""Returns a list of FunctionTool objects representing the
functions in the toolkit.
Returns:
List[FunctionTool]: A list of FunctionTool objects
representing the functions in the toolkit.
"""
return [
FunctionTool(self.list_all_pages),
FunctionTool(self.list_all_users),
FunctionTool(self.get_notion_block_text_content),
]