# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import os
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, List, Optional
import discord
import httpx
from fastapi import FastAPI
from camel.bots.discord.discord_installation import DiscordInstallation
from camel.logger import get_logger
from camel.utils import api_keys_required, dependencies_required
from .discord_store import DiscordBaseInstallationStore
if TYPE_CHECKING:
from discord import Message
logger = get_logger(__name__)
TOKEN_URL = "https://discord.com/api/oauth2/token"
USER_URL = "https://discord.com/api/users/@me"
[docs]
class DiscordApp:
r"""A class representing a Discord app that uses the `discord.py` library
to interact with Discord servers.
This bot can respond to messages in specific channels and only reacts to
messages that mention the bot.
Attributes:
channel_ids (Optional[List[int]]): A list of allowed channel IDs. If
provided, the bot will only respond to messages in these channels.
token (Optional[str]): The Discord bot token used for authentication.
"""
@dependencies_required('discord')
@api_keys_required(
[
("token", "DISCORD_BOT_TOKEN"),
]
)
def __init__(
self,
channel_ids: Optional[List[int]] = None,
token: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
redirect_uri: Optional[str] = None,
installation_store: Optional[DiscordBaseInstallationStore] = None,
intents: Optional[discord.Intents] = None,
) -> None:
r"""Initialize the DiscordApp instance by setting up the Discord client
and event handlers.
Args:
channel_ids (Optional[List[int]]): A list of allowed channel IDs.
The bot will only respond to messages in these channels if
provided. (default: :obj:`None`)
token (Optional[str]): The Discord bot token for authentication.
If not provided, the token will be retrieved from the
environment variable `DISCORD_TOKEN`. (default: :obj:`None`)
client_id (str, optional): The client ID for Discord OAuth.
(default: :obj:`None`)
client_secret (Optional[str]): The client secret for Discord OAuth.
(default: :obj:`None`)
redirect_uri (str): The redirect URI for OAuth callbacks.
(default: :obj:`None`)
installation_store (DiscordAsyncInstallationStore): The database
stores all information of all installations.
(default: :obj:`None`)
intents (discord.Intents): The Discord intents of this app.
(default: :obj:`None`)
Raises:
ValueError: If the `DISCORD_BOT_TOKEN` is not found in environment
variables.
"""
self.token = token or os.getenv("DISCORD_BOT_TOKEN")
self.channel_ids = channel_ids
self.installation_store = installation_store
if not intents:
intents = discord.Intents.all()
intents.message_content = True
intents.guilds = True
self._client = discord.Client(intents=intents)
# Register event handlers
self._client.event(self.on_ready)
self._client.event(self.on_message)
# OAuth flow
self.client_id = client_id or os.getenv("DISCORD_CLIENT_ID")
self.client_secret = client_secret or os.getenv(
"DISCORD_CLIENT_SECRET"
)
self.redirect_uri = redirect_uri
self.oauth_flow = bool(
self.client_id
and self.client_secret
and self.redirect_uri
and self.installation_store
)
self.app = FastAPI()
[docs]
async def start(self):
r"""Asynchronously start the Discord bot using its token.
This method starts the bot and logs into Discord asynchronously using
the provided token. It should be awaited when used in an async
environment.
"""
await self._client.start(self.token)
[docs]
def run(self) -> None:
r"""Start the Discord bot using its token.
This method starts the bot and logs into Discord synchronously using
the provided token. It blocks execution and keeps the bot running.
"""
self._client.run(self.token) # type: ignore[arg-type]
[docs]
async def exchange_code_for_token_response(
self, code: str
) -> Optional[str]:
r"""Exchange the authorization code for an access token.
Args:
code (str): The authorization code received from Discord after
user authorization.
Returns:
Optional[str]: The access token if successful, otherwise None.
Raises:
ValueError: If OAuth configuration is incomplete or invalid.
httpx.RequestError: If there is a network issue during the request.
"""
if not self.oauth_flow:
logger.warning(
"OAuth is not enabled. Missing client_id, "
"client_secret, or redirect_uri."
)
return None
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "authorization_code",
"code": code,
"redirect_uri": self.redirect_uri,
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
try:
async with httpx.AsyncClient() as client:
response = await client.post(
TOKEN_URL, data=data, headers=headers
)
if response.status_code != 200:
logger.error(f"Failed to exchange code: {response.text}")
return None
response_data = response.json()
return response_data
except (httpx.RequestError, ValueError) as e:
logger.error(f"Error during token fetch: {e}")
return None
[docs]
async def get_user_info(self, access_token: str) -> Optional[dict]:
r"""Retrieve user information using the access token.
Args:
access_token (str): The access token received from Discord.
Returns:
dict: The user information retrieved from Discord.
"""
if not self.oauth_flow:
logger.warning(
"OAuth is not enabled. Missing client_id, "
"client_secret, or redirect_uri."
)
return None
headers = {"Authorization": f"Bearer {access_token}"}
async with httpx.AsyncClient() as client:
user_response = await client.get(USER_URL, headers=headers)
return user_response.json()
[docs]
async def refresh_access_token(self, refresh_token: str) -> Optional[str]:
r"""Refresh the access token using a refresh token.
Args:
refresh_token (str): The refresh token issued by Discord that
can be used to obtain a new access token.
Returns:
Optional[str]: The new access token if successful, otherwise None.
"""
if not self.oauth_flow:
logger.warning(
"OAuth is not enabled. Missing client_id, "
"client_secret, or redirect_uri."
)
return None
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"redirect_uri": self.redirect_uri,
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
async with httpx.AsyncClient() as client:
response = await client.post(TOKEN_URL, data=data, headers=headers)
if response.status_code != 200:
logger.error(f"Failed to refresh token: {response.text}")
return None
response_data = response.json()
return response_data.get("access_token")
[docs]
async def get_valid_access_token(self, guild_id: str) -> Optional[str]:
r"""Retrieve a valid access token for the specified guild.
This method attempts to retrieve an access token for a specific guild.
If the current access token is expired, it will refresh the token using
the refresh token.
Args:
guild_id (str): The ID of the guild to retrieve the access
token for.
Returns:
Optional[str]: The valid access token if successful,
otherwise None.
"""
if not self.oauth_flow:
logger.warning(
"OAuth is not enabled. Missing client_id, "
"client_secret, or redirect_uri."
)
return None
assert self.installation_store is not None
installation = await self.installation_store.find_by_guild(
guild_id=guild_id
)
if not installation:
logger.error(f"No installation found for guild: {guild_id}")
return None
if (
installation.token_expires_at
and datetime.now() >= installation.token_expires_at
):
logger.info(
f"Access token expired for guild: {guild_id}, "
f"refreshing token..."
)
new_access_token = await self.refresh_access_token(
installation.refresh_token
)
if new_access_token:
installation.access_token = new_access_token
installation.token_expires_at = datetime.now() + timedelta(
seconds=3600
)
await self.installation_store.save(installation)
return new_access_token
else:
logger.error(
f"Failed to refresh access token for guild: {guild_id}"
)
return None
return installation.access_token
[docs]
async def save_installation(
self,
guild_id: str,
access_token: str,
refresh_token: str,
expires_in: int,
):
r"""Save the installation information for a given guild.
Args:
guild_id (str): The ID of the guild where the bot is installed.
access_token (str): The access token for the guild.
refresh_token (str): The refresh token for the guild.
expires_in: (int): The expiration time of the
access token.
"""
if not self.oauth_flow:
logger.warning(
"OAuth is not enabled. Missing client_id, "
"client_secret, or redirect_uri."
)
return None
assert self.installation_store is not None
expires_at = datetime.now() + timedelta(seconds=expires_in)
installation = DiscordInstallation(
guild_id=guild_id,
access_token=access_token,
refresh_token=refresh_token,
installed_at=datetime.now(),
token_expires_at=expires_at,
)
await self.installation_store.save(installation)
logger.info(f"Installation saved for guild: {guild_id}")
[docs]
async def remove_installation(self, guild: discord.Guild):
r"""Remove the installation for a given guild.
Args:
guild (discord.Guild): The guild from which the bot is
being removed.
"""
if not self.oauth_flow:
logger.warning(
"OAuth is not enabled. Missing client_id, "
"client_secret, or redirect_uri."
)
return None
assert self.installation_store is not None
await self.installation_store.delete(guild_id=str(guild.id))
print(f"Bot removed from guild: {guild.id}")
[docs]
async def on_ready(self) -> None:
r"""Event handler that is called when the bot has successfully
connected to the Discord server.
When the bot is ready and logged into Discord, it prints a message
displaying the bot's username.
"""
logger.info(f'We have logged in as {self._client.user}')
[docs]
async def on_message(self, message: 'Message') -> None:
r"""Event handler for processing incoming messages.
This method is called whenever a new message is received by the bot. It
will ignore messages sent by the bot itself, only respond to messages
in allowed channels (if specified), and only to messages that mention
the bot.
Args:
message (discord.Message): The message object received from
Discord.
"""
# If the message author is the bot itself,
# do not respond to this message
if message.author == self._client.user:
return
# If allowed channel IDs are provided,
# only respond to messages in those channels
if self.channel_ids and message.channel.id not in self.channel_ids:
return
# Only respond to messages that mention the bot
if not self._client.user or not self._client.user.mentioned_in(
message
):
return
logger.info(f"Received message: {message.content}")
@property
def client(self):
return self._client