Source code for camel.toolkits.github_toolkit
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
import logging
import os
from typing import Dict, List, Literal, Optional, Union
from camel.toolkits import FunctionTool
from camel.toolkits.base import BaseToolkit
from camel.utils import dependencies_required
logger = logging.getLogger(__name__)
[docs]
class GithubToolkit(BaseToolkit):
r"""A class representing a toolkit for interacting with GitHub
repositories.
This class provides methods for retrieving open issues, retrieving
specific issues, and creating pull requests in a GitHub repository.
Args:
repo_name (str): The name of the GitHub repository.
access_token (str, optional): The access token to authenticate with
GitHub. If not provided, it will be obtained using the
`get_github_access_token` method.
"""
@dependencies_required('github')
def __init__(
self, repo_name: str, access_token: Optional[str] = None
) -> None:
r"""Initializes a new instance of the GitHubToolkit class.
Args:
repo_name (str): The name of the GitHub repository.
access_token (str, optional): The access token to authenticate
with GitHub. If not provided, it will be obtained using the
`get_github_access_token` method.
"""
from github import Auth, Github
if access_token is None:
access_token = self.get_github_access_token()
self.github = Github(auth=Auth.Token(access_token))
self.repo = self.github.get_repo(repo_name)
[docs]
def get_github_access_token(self) -> str:
r"""Retrieve the GitHub access token from environment variables.
Returns:
str: A string containing the GitHub access token.
Raises:
ValueError: If the API key or secret is not found in the
environment variables.
"""
# Get `GITHUB_ACCESS_TOKEN` here: https://github.com/settings/tokens
GITHUB_ACCESS_TOKEN = os.environ.get("GITHUB_ACCESS_TOKEN")
if not GITHUB_ACCESS_TOKEN:
raise ValueError(
"`GITHUB_ACCESS_TOKEN` not found in environment variables. Get"
" it here: `https://github.com/settings/tokens`."
)
return GITHUB_ACCESS_TOKEN
[docs]
def create_pull_request(
self,
file_path: str,
new_content: str,
pr_title: str,
body: str,
branch_name: str,
) -> str:
r"""Creates a pull request.
This function creates a pull request in specified repository, which
updates a file in the specific path with new content. The pull request
description contains information about the issue title and number.
Args:
file_path (str): The path of the file to be updated in the
repository.
new_content (str): The specified new content of the specified file.
pr_title (str): The title of the issue that is solved by this pull
request.
body (str): The commit message for the pull request.
branch_name (str): The name of the branch to create and submit the
pull request from.
Returns:
str: A formatted report of whether the pull request was created
successfully or not.
"""
sb = self.repo.get_branch(self.repo.default_branch)
self.repo.create_git_ref(
ref=f"refs/heads/{branch_name}", sha=sb.commit.sha
)
file = self.repo.get_contents(file_path)
from github.ContentFile import ContentFile
if isinstance(file, ContentFile):
self.repo.update_file(
file.path, body, new_content, file.sha, branch=branch_name
)
pr = self.repo.create_pull(
title=pr_title,
body=body,
head=branch_name,
base=self.repo.default_branch,
)
if pr is not None:
return f"Title: {pr.title}\n" f"Body: {pr.body}\n"
else:
return "Failed to create pull request."
else:
raise ValueError("PRs with multiple files aren't supported yet.")
[docs]
def get_issue_list(
self, state: Literal["open", "closed", "all"] = "all"
) -> List[Dict[str, object]]:
r"""Retrieves all issues from the GitHub repository.
Args:
state (Literal["open", "closed", "all"]): The state of pull
requests to retrieve. (default::obj: `all`)
Options are:
- "open": Retrieve only open pull requests.
- "closed": Retrieve only closed pull requests.
- "all": Retrieve all pull requests, regardless of state.
Returns:
List[Dict[str, object]]: A list of dictionaries where each
dictionary contains the issue number and title.
"""
issues_info = []
issues = self.repo.get_issues(state=state)
for issue in issues:
issues_info.append({"number": issue.number, "title": issue.title})
return issues_info
[docs]
def get_issue_content(self, issue_number: int) -> str:
r"""Retrieves the content of a specific issue by its number.
Args:
issue_number (int): The number of the issue to retrieve.
Returns:
str: issues content details.
"""
try:
issue = self.repo.get_issue(number=issue_number)
return issue.body
except Exception as e:
return f"can't get Issue number {issue_number}: {e!s}"
[docs]
def get_pull_request_list(
self, state: Literal["open", "closed", "all"] = "all"
) -> List[Dict[str, object]]:
r"""Retrieves all pull requests from the GitHub repository.
Args:
state (Literal["open", "closed", "all"]): The state of pull
requests to retrieve. (default::obj: `all`)
Options are:
- "open": Retrieve only open pull requests.
- "closed": Retrieve only closed pull requests.
- "all": Retrieve all pull requests, regardless of state.
Returns:
list: A list of dictionaries where each dictionary contains the
pull request number and title.
"""
pull_requests_info = []
pull_requests = self.repo.get_pulls(state=state)
for pr in pull_requests:
pull_requests_info.append({"number": pr.number, "title": pr.title})
return pull_requests_info
[docs]
def get_pull_request_code(self, pr_number: int) -> List[Dict[str, str]]:
r"""Retrieves the code changes of a specific pull request.
Args:
pr_number (int): The number of the pull request to retrieve.
Returns:
List[Dict[str, str]]: A list of dictionaries where each dictionary
contains the file name and the corresponding code changes
(patch).
"""
# Retrieve the specific pull request
pr = self.repo.get_pull(number=pr_number)
# Collect the file changes from the pull request
files_changed = []
# Returns the files and their changes in the pull request
files = pr.get_files()
for file in files:
files_changed.append(
{
"filename": file.filename,
"patch": file.patch, # The code diff or changes
}
)
return files_changed
[docs]
def get_pull_request_comments(
self, pr_number: int
) -> List[Dict[str, str]]:
r"""Retrieves the comments from a specific pull request.
Args:
pr_number (int): The number of the pull request to retrieve.
Returns:
List[Dict[str, str]]: A list of dictionaries where each dictionary
contains the user ID and the comment body.
"""
# Retrieve the specific pull request
pr = self.repo.get_pull(number=pr_number)
# Collect the comments from the pull request
comments = []
# Returns all the comments in the pull request
for comment in pr.get_comments():
comments.append({"user": comment.user.login, "body": comment.body})
return comments
[docs]
def get_all_file_paths(self, path: str = "") -> List[str]:
r"""Recursively retrieves all file paths in the GitHub repository.
Args:
path (str): The repository path to start the traversal from.
empty string means starts from the root directory.
(default::obj: `""`)
Returns:
List[str]: A list of file paths within the specified directory
structure.
"""
from github.ContentFile import ContentFile
files: List[str] = []
# Retrieves all contents of the current directory
contents: Union[List[ContentFile], ContentFile] = (
self.repo.get_contents(path)
)
if isinstance(contents, ContentFile):
files.append(contents.path)
else:
for content in contents:
if content.type == "dir":
# If it's a directory, recursively retrieve its file paths
files.extend(self.get_all_file_paths(content.path))
else:
# If it's a file, add its path to the list
files.append(content.path)
return files
[docs]
def retrieve_file_content(self, file_path: str) -> str:
r"""Retrieves the content of a file from the GitHub repository.
Args:
file_path (str): The path of the file to retrieve.
Returns:
str: The decoded content of the file.
"""
from github.ContentFile import ContentFile
file_content = self.repo.get_contents(file_path)
if isinstance(file_content, ContentFile):
return file_content.decoded_content.decode()
else:
raise ValueError("PRs with multiple files aren't supported yet.")
[docs]
def get_tools(self) -> List[FunctionTool]:
r"""Returns a list of FunctionTool objects representing the functions
in the toolkit.
Returns:
List[FunctionTool]: A list of FunctionTool objects representing
the functions in the toolkit.
"""
return [
FunctionTool(self.create_pull_request),
FunctionTool(self.get_issue_list),
FunctionTool(self.get_issue_content),
FunctionTool(self.get_pull_request_list),
FunctionTool(self.get_pull_request_code),
FunctionTool(self.get_pull_request_comments),
FunctionTool(self.get_all_file_paths),
FunctionTool(self.retrieve_file_content),
]