# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import time
from enum import Enum, auto
from string import Template
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
if TYPE_CHECKING:
from github import Github
from pydantic import BaseModel
from camel.agents import ChatAgent
from camel.logger import get_logger
from camel.messages import BaseMessage
from camel.models import BaseModelBackend, ModelFactory
from camel.responses import ChatAgentResponse
from camel.retrievers import VectorRetriever
from camel.types import (
ModelPlatformType,
ModelType,
OpenAIBackendRole,
RoleType,
)
from camel.utils import track_agent
from camel.utils.chunker import CodeChunker
logger = get_logger(__name__)
class ProcessingMode(Enum):
FULL_CONTEXT = auto()
RAG = auto()
class GitHubFile(BaseModel):
r"""Model to hold GitHub file information.
Attributes:
content (str): The content of the GitHub text.
file_path (str): The path of the file.
html_url (str): The actual url of the file.
"""
content: str
file_path: str
html_url: str
class RepositoryInfo(BaseModel):
r"""Model to hold GitHub repository information.
Attributes:
repo_name (str): The full name of the repository.
repo_url (str): The URL of the repository.
contents (list): A list to hold the repository contents.
"""
repo_name: str
repo_url: str
contents: List[GitHubFile] = []
[docs]
@track_agent(name="RepoAgent")
class RepoAgent(ChatAgent):
r"""A specialized agent designed to interact with GitHub repositories for
code generation tasks.
The RepoAgent enhances a base ChatAgent by integrating context from
one or more GitHub repositories. It supports two processing modes:
- FULL_CONTEXT: loads and injects full repository content into the
prompt.
- RAG (Retrieval-Augmented Generation): retrieves relevant
code/documentation chunks using a vector store when context
length exceeds a specified token limit.
Attributes:
vector_retriever (VectorRetriever): Retriever used to
perform semantic search in RAG mode. Required if repo content
exceeds context limit.
system_message (Optional[str]): The system message
for the chat agent. (default: :str:`"You are a code assistant
with repo context."`)
repo_paths (Optional[List[str]]): List of GitHub repository URLs to
load during initialization. (default: :obj:`None`)
model (BaseModelBackend): The model backend to use for generating
responses. (default: :obj:`ModelPlatformType.DEFAULT`
with `ModelType.DEFAULT`)
max_context_tokens (Optional[int]): Maximum number of tokens allowed
before switching to RAG mode. (default: :obj:`2000`)
github_auth_token (Optional[str]): GitHub personal access token
for accessing private or rate-limited repositories. (default:
:obj:`None`)
chunk_size (Optional[int]): Maximum number of characters per code chunk
when indexing files for RAG. (default: :obj:`8192`)
top_k (int): Number of top-matching chunks to retrieve from the vector
store in RAG mode. (default: :obj:`5`)
similarity (Optional[float]): Minimum similarity score required to
include a chunk in the RAG context. (default: :obj:`0.6`)
collection_name (Optional[str]): Name of the vector database
collection to use for storing and retrieving chunks. (default:
:obj:`None`)
**kwargs: Inherited from ChatAgent
Note:
The current implementation of RAG mode requires using Qdrant as the
vector storage backend. The VectorRetriever defaults to QdrantStorage
if no storage is explicitly provided. Other vector storage backends
are not currently supported for the RepoAgent's RAG functionality.
"""
def __init__(
self,
vector_retriever: VectorRetriever,
system_message: Optional[
str
] = "You are a code assistant with repo context.",
repo_paths: Optional[List[str]] = None,
model: Optional[BaseModelBackend] = None,
max_context_tokens: int = 2000,
github_auth_token: Optional[str] = None,
chunk_size: Optional[int] = 8192,
top_k: Optional[int] = 5,
similarity: Optional[float] = 0.6,
collection_name: Optional[str] = None,
**kwargs,
):
if model is None:
model = ModelFactory.create(
model_platform=ModelPlatformType.DEFAULT,
model_type=ModelType.DEFAULT,
)
super().__init__(system_message=system_message, model=model, **kwargs)
self.max_context_tokens = max_context_tokens
self.vector_retriever = vector_retriever
self.github_auth_token = github_auth_token
self.chunk_size = chunk_size
self.num_tokens = 0
self.processing_mode = ProcessingMode.FULL_CONTEXT
self.top_k = top_k
self.similarity = similarity
self.collection_name = collection_name
self.prompt_template = Template(
"$type: $repo\n"
"You are an AI coding assistant. "
"Your task is to generate code based on provided GitHub "
"repositories. \n"
"### Instructions: \n1. **Analyze the Repositories**: "
"Identify which repositories contain relevant "
"information for the user's request. Ignore unrelated ones.\n"
"2. **Extract Context**: Use code, documentation, "
"dependencies, and tests to understand functionality.\n"
"3. **Generate Code**: Create clean, efficient, and "
"well-structured code that aligns with relevant repositories. \n"
"4. **Justify Output**: Explain which repositories "
"influenced your solution and why others were ignored."
"\n If the repositories lack necessary details, "
"infer best practices and suggest improvements.\n"
"Now, analyze the repositories and generate the "
"required code."
)
self.full_text = ""
self.chunker = CodeChunker(chunk_size=chunk_size or 8192)
self.repos: List[RepositoryInfo] = []
if repo_paths:
self.repos = self.load_repositories(repo_paths)
if len(self.repos) > 0:
self.construct_full_text()
self.num_tokens = self.count_tokens()
if not self.check_switch_mode():
self.update_memory(
message=BaseMessage.make_user_message(
role_name=RoleType.USER.value,
content=self.full_text,
),
role=OpenAIBackendRole.SYSTEM,
)
[docs]
def parse_url(self, url: str) -> Tuple[str, str]:
r"""Parse the GitHub URL and return the (owner, repo_name) tuple.
Args:
url (str): The URL to be parsed.
Returns:
Tuple[str, str]: The (owner, repo_name) tuple.
"""
try:
url_path = url.replace("https://github.com/", "")
parts = url_path.split("/")
if len(parts) != 2:
raise ValueError("Incorrect GitHub repo URL format.")
else:
return parts[0], parts[1]
except Exception as e:
logger.error(f"Error parsing URL: {e}")
raise Exception(e)
[docs]
def load_repositories(
self,
repo_urls: List[str],
) -> List[RepositoryInfo]:
r"""Load the content of a GitHub repository.
Args:
repo_urls (str): The list of Repo URLs.
Returns:
List[RepositoryInfo]: A list of objects containing information
about the all repositories, including the contents.
"""
from github import Github
github_client = Github(self.github_auth_token)
res = []
for repo_url in repo_urls:
try:
res.append(self.load_repository(repo_url, github_client))
except Exception as e:
logger.error(f"Error loading repository: {e}")
raise Exception(e)
time.sleep(1)
logger.info(f"Successfully loaded {len(res)} repositories.")
return res
[docs]
def load_repository(
self,
repo_url: str,
github_client: "Github",
) -> RepositoryInfo:
r"""Load the content of a GitHub repository.
Args:
repo_urls (str): The Repo URL to be loaded.
github_client (GitHub): The established GitHub client.
Returns:
RepositoryInfo: The object containing information
about the repository, including the contents.
"""
from github.ContentFile import ContentFile
try:
owner, repo_name = self.parse_url(repo_url)
repo = github_client.get_repo(f"{owner}/{repo_name}")
contents = repo.get_contents("")
except Exception as e:
logger.error(f"Error loading repository: {e}")
raise Exception(e)
info = RepositoryInfo(
repo_name=repo.full_name,
repo_url=repo.html_url,
contents=[],
)
# Create a list to process repository contents
content_list: List[ContentFile] = []
if isinstance(contents, list):
content_list = contents
else:
# Handle single ContentFile case
content_list = [contents]
while content_list:
file = content_list.pop(0)
if file.type == "file":
if any(
file.path.endswith(ext)
for ext in [
".png",
".jpg",
".pdf",
".zip",
".gitignore",
".mp4",
".avi",
".mov",
".mp3",
".wav",
".tar",
".gz",
".7z",
".rar",
".iso",
".gif",
".docx",
]
):
logger.info(f"Skipping binary file: {file.path}")
continue
try:
file_obj = repo.get_contents(file.path)
# Handle file_obj which could be a single ContentFile or a
# list
if isinstance(file_obj, list):
if not file_obj: # Skip empty lists
continue
file_obj = file_obj[
0
] # Take the first item if it's a list
if getattr(file_obj, "encoding", None) != "base64":
logger.warning(
f"Skipping file with unsupported "
f"encoding: {file.path}"
)
continue
try:
content_bytes = file_obj.decoded_content
file_content = content_bytes.decode("utf-8")
except UnicodeDecodeError:
logger.warning(f"Skipping non-UTF-8 file: {file.path}")
continue
except Exception as e:
logger.error(
f"Failed to decode file content at "
f"{file.path}: {e}"
)
continue
github_file = GitHubFile(
content=file_content,
file_path=f"{owner}/{repo_name}/{file.path}",
html_url=file.html_url,
)
info.contents.append(github_file)
except Exception as e:
logger.error(f"Error loading file: {e}")
raise Exception(e)
logger.info(f"Successfully loaded file: {file.path}")
elif file.type == "dir":
dir_contents = repo.get_contents(file.path)
# Handle dir_contents which could be a single ContentFile or a
# list
if isinstance(dir_contents, list):
content_list.extend(dir_contents)
else:
content_list.append(dir_contents)
return info
[docs]
def count_tokens(self) -> int:
r"""To count the tokens that's currently in the memory
Returns:
int: The number of tokens
"""
counter = self.model_backend.token_counter
content_token_count = counter.count_tokens_from_messages(
messages=[
BaseMessage.make_user_message(
role_name=RoleType.USER.value,
content=self.full_text,
).to_openai_message(OpenAIBackendRole.USER)
]
)
return content_token_count
[docs]
def construct_full_text(self):
r"""Construct full context text from repositories by concatenation."""
repo_texts = [
{"content": f.content, "path": f.file_path}
for repo in self.repos
for f in repo.contents
]
self.full_text = self.prompt_template.safe_substitute(
type="Repository",
repo="\n".join(
f"{repo['path']}\n{repo['content']}" for repo in repo_texts
),
)
[docs]
def add_repositories(self, repo_urls: List[str]):
r"""Add a GitHub repository to the list of repositories.
Args:
repo_urls (str): The Repo URL to be added.
"""
new_repos = self.load_repositories(repo_urls)
self.repos.extend(new_repos)
self.construct_full_text()
self.num_tokens = self.count_tokens()
if self.processing_mode == ProcessingMode.RAG:
for repo in new_repos:
for f in repo.contents:
self.vector_retriever.process(
content=f.content,
should_chunk=True,
extra_info={"file_path": f.file_path},
chunker=self.chunker,
)
else:
self.check_switch_mode()
[docs]
def check_switch_mode(self) -> bool:
r"""Check if the current context exceeds the context window; if so,
switch to RAG mode.
Returns:
bool: True if the mode was switched, False otherwise.
"""
if self.processing_mode == ProcessingMode.RAG:
return False
if self.num_tokens > self.max_context_tokens:
if not self.vector_retriever:
logger.warning(
f"Token count ({self.num_tokens}) exceeds limit "
f"({self.max_context_tokens}). "
"Either reduce repository size or provide a "
"VectorRetriever."
)
return False
logger.info("Switching to RAG mode and indexing repositories...")
self.processing_mode = ProcessingMode.RAG
for repo in self.repos:
for f in repo.contents:
self.vector_retriever.process(
content=f.content,
should_chunk=True,
extra_info={"file_path": f.file_path},
chunker=self.chunker,
)
self._system_message = None
self.reset()
return True
return False
[docs]
def step(
self, input_message: Union[BaseMessage, str], *args, **kwargs
) -> ChatAgentResponse:
r"""Overrides `ChatAgent.step()` to first retrieve relevant context
from the vector store before passing the input to the language model.
"""
if (
self.processing_mode == ProcessingMode.RAG
and self.vector_retriever
):
if isinstance(input_message, BaseMessage):
user_query = input_message.content
else:
user_query = input_message
retrieved_content = []
retries = 1
for attempt in range(retries):
try:
raw_rag_content = self.vector_retriever.query(
query=user_query,
top_k=self.top_k or 5,
similarity_threshold=self.similarity or 0.6,
)
# Remove duplicates and retrieve the whole file
paths = []
for record in raw_rag_content:
file_path = record["extra_info"]["file_path"]
if file_path not in paths:
retrieved_content.append(
{
"content": self.search_by_file_path(
file_path
),
"similarity": record["similarity score"],
}
)
paths.append(file_path)
retrieved_content = sorted(
retrieved_content,
key=lambda x: x["similarity"],
reverse=True,
)
full_prompt = self.prompt_template.safe_substitute(
type="Retrieved code",
repo="\n".join(
[record["content"] for record in retrieved_content]
),
)
new_query = user_query + "\n" + full_prompt
if isinstance(input_message, BaseMessage):
input_message.content = new_query
else:
input_message = BaseMessage.make_user_message(
role_name="User", content=new_query
)
break
except Exception:
if attempt < retries - 1:
sleep_time = 2**attempt
logger.info(
f"Retrying qdrant query in {sleep_time} seconds..."
)
time.sleep(sleep_time)
else:
logger.error(
f"Failed to query qdrant record after {retries} "
"attempts."
)
return super().step(input_message, *args, **kwargs)
[docs]
def reset(self):
super().reset()
if self.processing_mode == ProcessingMode.FULL_CONTEXT:
message = BaseMessage.make_user_message(
role_name=RoleType.USER.value,
content=self.full_text,
)
self.update_memory(message, OpenAIBackendRole.SYSTEM)
else:
self.num_tokens = 0
[docs]
def search_by_file_path(self, file_path: str) -> str:
r"""Search for all payloads in the vector database where
file_path matches the given value (the same file),
then sort by piece_num and concatenate text fields to return a
complete result.
Args:
file_path (str): The `file_path` value to filter the payloads.
Returns:
str: A concatenated string of the `text` fields sorted by
`piece_num`.
"""
from qdrant_client.models import FieldCondition, Filter, MatchValue
try:
storage_instance = self.vector_retriever.storage
collection_name = (
self.collection_name or storage_instance.collection_name # type: ignore[attr-defined]
)
source_data, _ = storage_instance.client.scroll(
collection_name=collection_name,
limit=1000,
scroll_filter=Filter(
must=[
FieldCondition(
key="extra_info.file_path",
match=MatchValue(value=file_path),
)
]
),
with_payload=True,
with_vectors=False,
)
except Exception as e:
logger.error(
f"Error during database initialization or scroll: {e}"
)
raise Exception(e)
results = []
for point in source_data:
payload = point.payload
piece_num = payload["metadata"]["piece_num"]
text = payload["text"]
if piece_num is not None and text:
results.append({"piece_num": piece_num, "text": text})
sorted_results = sorted(results, key=lambda x: x["piece_num"])
full_doc = "\n".join([item["text"] for item in sorted_results])
return full_doc