Source code for camel.toolkits.terminal_toolkit

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import atexit
import os
import platform
import queue
import subprocess
import sys
import threading
import venv
from queue import Queue
from typing import Any, Dict, List, Optional, Tuple

from camel.logger import get_logger
from camel.toolkits.base import BaseToolkit
from camel.toolkits.function_tool import FunctionTool
from camel.utils import MCPServer

logger = get_logger(__name__)


[docs] @MCPServer() class TerminalToolkit(BaseToolkit): r"""A toolkit for terminal operations across multiple operating systems. This toolkit provides a set of functions for terminal operations such as searching for files by name or content, executing shell commands, and managing terminal sessions. Args: timeout (Optional[float]): The timeout for terminal operations. shell_sessions (Optional[Dict[str, Any]]): A dictionary to store shell session information. If None, an empty dictionary will be used. (default: :obj:`{}`) working_dir (str): The working directory for operations. If specified, all execution and write operations will be restricted to this directory. Read operations can access paths outside this directory.(default: :obj:`"./workspace"`) need_terminal (bool): Whether to create a terminal interface. (default: :obj:`True`) use_shell_mode (bool): Whether to use shell mode for command execution. (default: :obj:`True`) clone_current_env (bool): Whether to clone the current Python environment.(default: :obj:`False`) safe_mode (bool): Whether to enable safe mode to restrict operations. (default: :obj:`True`) Note: Most functions are compatible with Unix-based systems (macOS, Linux). For Windows compatibility, additional implementation details are needed. """ def __init__( self, timeout: Optional[float] = None, shell_sessions: Optional[Dict[str, Any]] = None, working_dir: str = "./workspace", need_terminal: bool = True, use_shell_mode: bool = True, clone_current_env: bool = False, safe_mode: bool = True, ): super().__init__(timeout=timeout) self.shell_sessions = shell_sessions or {} self.os_type = platform.system() self.output_queue: Queue[str] = Queue() self.agent_queue: Queue[str] = Queue() self.terminal_ready = threading.Event() self.gui_thread = None self.safe_mode = safe_mode self.cloned_env_path = None self.use_shell_mode = use_shell_mode self.python_executable = sys.executable self.is_macos = platform.system() == 'Darwin' atexit.register(self.__del__) if not os.path.exists(working_dir): os.makedirs(working_dir, exist_ok=True) self.working_dir = os.path.abspath(working_dir) self._update_terminal_output( f"Working directory set to: {self.working_dir}\n" ) if self.safe_mode: self._update_terminal_output( "Safe mode enabled: Write operations can only " "be performed within the working directory\n" ) if clone_current_env: self.cloned_env_path = os.path.join(self.working_dir, ".venv") self._clone_current_environment() else: self.cloned_env_path = None if need_terminal: if self.is_macos: # macOS uses non-GUI mode logger.info("Detected macOS environment, using non-GUI mode") self._setup_file_output() self.terminal_ready.set() else: # Other platforms use normal GUI self.gui_thread = threading.Thread( target=self._create_terminal, daemon=True ) self.gui_thread.start() self.terminal_ready.wait(timeout=5) def _setup_file_output(self): r"""Set up file output to replace GUI, using a fixed file to simulate terminal. """ self.log_file = os.path.join(os.getcwd(), "camel_terminal.txt") if os.path.exists(self.log_file): with open(self.log_file, "w") as f: f.truncate(0) f.write("CAMEL Terminal Session\n") f.write("=" * 50 + "\n") f.write(f"Working Directory: {os.getcwd()}\n") f.write("=" * 50 + "\n\n") else: with open(self.log_file, "w") as f: f.write("CAMEL Terminal Session\n") f.write("=" * 50 + "\n") f.write(f"Working Directory: {os.getcwd()}\n") f.write("=" * 50 + "\n\n") # Inform the user logger.info(f"Terminal output redirected to: {self.log_file}") def file_update(output: str): try: # Directly append to the end of the file with open(self.log_file, "a") as f: f.write(output) # If the output does not end with a newline, add one if output and not output.endswith('\n'): f.write('\n') # Ensure the agent also receives the output self.agent_queue.put(output) except Exception as e: logger.error(f"Failed to write to terminal: {e}") # Replace the update method self._update_terminal_output = file_update def _clone_current_environment(self): r"""Create a new Python virtual environment.""" try: if os.path.exists(self.cloned_env_path): self._update_terminal_output( f"Using existing environment: {self.cloned_env_path}\n" ) return self._update_terminal_output( f"Creating new Python environment at:{self.cloned_env_path}\n" ) venv.create(self.cloned_env_path, with_pip=True) self._update_terminal_output( "New Python environment created successfully!\n" ) except Exception as e: self._update_terminal_output( f"Failed to create environment: {e!s}\n" ) logger.error(f"Failed to create environment: {e}") def _create_terminal(self): r"""Create a terminal GUI.""" try: import tkinter as tk from tkinter import scrolledtext def update_terminal(): try: while True: output = self.output_queue.get_nowait() if isinstance(output, bytes): output = output.decode('utf-8', errors='replace') self.terminal.insert(tk.END, output) self.terminal.see(tk.END) except queue.Empty: if hasattr(self, 'root') and self.root: self.root.after(100, update_terminal) self.root = tk.Tk() self.root.title(f"{self.os_type} Terminal") self.root.geometry("800x600") self.root.minsize(400, 300) self.terminal = scrolledtext.ScrolledText( self.root, wrap=tk.WORD, bg='black', fg='white', font=('Consolas', 10), insertbackground='white', # Cursor color ) self.terminal.pack(fill=tk.BOTH, expand=True) # Set the handling for closing the window def on_closing(): self.root.quit() self.root.destroy() self.root = None self.root.protocol("WM_DELETE_WINDOW", on_closing) # Start updating update_terminal() # Mark the terminal as ready self.terminal_ready.set() # Start the main loop self.root.mainloop() except Exception as e: logger.error(f"Failed to create terminal: {e}") self.terminal_ready.set() def _update_terminal_output(self, output: str): r"""Update terminal output and send to agent. Args: output (str): The output to be sent to the agent """ try: # If it is macOS , only write to file if self.is_macos: if hasattr(self, 'log_file'): with open(self.log_file, "a") as f: f.write(output) # Ensure the agent also receives the output self.agent_queue.put(output) return # For other cases, try to update the GUI (if it exists) if hasattr(self, 'root') and self.root: self.output_queue.put(output) # Always send to agent queue self.agent_queue.put(output) except Exception as e: logger.error(f"Failed to update terminal output: {e}") def _is_path_within_working_dir(self, path: str) -> bool: r"""Check if the path is within the working directory. Args: path (str): The path to check Returns: bool: Returns True if the path is within the working directory, otherwise returns False """ abs_path = os.path.abspath(path) return abs_path.startswith(self.working_dir) def _enforce_working_dir_for_execution(self, path: str) -> Optional[str]: r"""Enforce working directory restrictions, return error message if execution path is not within the working directory. Args: path (str): The path to be used for executing operations Returns: Optional[str]: Returns error message if the path is not within the working directory, otherwise returns None """ if not self._is_path_within_working_dir(path): return ( f"Operation restriction: Execution path {path} must " f"be within working directory {self.working_dir}" ) return None def _copy_external_file_to_workdir( self, external_file: str ) -> Optional[str]: r"""Copy external file to working directory. Args: external_file (str): The path of the external file Returns: Optional[str]: New path after copying to the working directory, returns None on failure """ try: import shutil filename = os.path.basename(external_file) new_path = os.path.join(self.working_dir, filename) shutil.copy2(external_file, new_path) return new_path except Exception as e: logger.error(f"Failed to copy file: {e}") return None
[docs] def file_find_in_content( self, file: str, regex: str, sudo: bool = False ) -> str: r"""Search for matching text within file content. Args: file (str): Absolute path of the file to search within. regex (str): Regular expression pattern to match. sudo (bool, optional): Whether to use sudo privileges. Defaults to False. Note: Using sudo requires the process to have appropriate permissions. Returns: str: Matching content found in the file. """ if not os.path.exists(file): return f"File not found: {file}" if not os.path.isfile(file): return f"The path provided is not a file: {file}" command = [] if sudo: error_msg = self._enforce_working_dir_for_execution(file) if error_msg: return error_msg command.extend(["sudo"]) if self.os_type in ['Darwin', 'Linux']: # macOS or Linux command.extend(["grep", "-E", regex, file]) else: # Windows # For Windows, we could use PowerShell or findstr command.extend(["findstr", "/R", regex, file]) try: result = subprocess.run( command, check=False, capture_output=True, text=True ) return result.stdout.strip() except subprocess.SubprocessError as e: logger.error(f"Error searching in file content: {e}") return f"Error: {e!s}"
[docs] def file_find_by_name(self, path: str, glob: str) -> str: r"""Find files by name pattern in specified directory. Args: path (str): Absolute path of directory to search. glob (str): Filename pattern using glob syntax wildcards. Returns: str: List of files matching the pattern. """ if not os.path.exists(path): return f"Directory not found: {path}" if not os.path.isdir(path): return f"The path provided is not a directory: {path}" command = [] if self.os_type in ['Darwin', 'Linux']: # macOS or Linux command.extend(["find", path, "-name", glob]) else: # Windows # For Windows, we use dir command with /s for recursive search # and /b for bare format pattern = glob file_path = os.path.join(path, pattern).replace('/', '\\') command.extend(["cmd", "/c", "dir", "/s", "/b", file_path]) try: result = subprocess.run( command, check=False, capture_output=True, text=True, shell=False, ) output = result.stdout.strip() if self.os_type == 'Windows': output = output.replace('\\', '/') return output except subprocess.SubprocessError as e: logger.error(f"Error finding files by name: {e}") return f"Error: {e!s}"
def _sanitize_command(self, command: str, exec_dir: str) -> Tuple: r"""Check and modify command to ensure safety. Args: command (str): The command to check exec_dir (str): The directory to execute the command in Returns: Tuple: (is safe, modified command or error message) """ if not self.safe_mode: return True, command if not command or command.strip() == "": return False, "Empty command" # Use shlex for safer command parsing import shlex try: parts = shlex.split(command) except ValueError as e: # Handle malformed commands (e.g., unbalanced quotes) return False, f"Invalid command format: {e}" if not parts: return False, "Empty command" # Get base command base_cmd = parts[0].lower() # Handle special commands if base_cmd in ['cd', 'chdir']: # Check if cd command attempts to leave the working directory if len(parts) > 1: target_dir = parts[1].strip('"\'') if ( target_dir.startswith('/') or target_dir.startswith('\\') or ':' in target_dir ): # Absolute path abs_path = os.path.abspath(target_dir) else: # Relative path abs_path = os.path.abspath( os.path.join(exec_dir, target_dir) ) if not self._is_path_within_working_dir(abs_path): return False, ( f"Safety restriction: Cannot change to directory " f"outside of working directory {self.working_dir}" ) # Check file operation commands elif base_cmd in [ 'rm', 'del', 'rmdir', 'rd', 'deltree', 'erase', 'unlink', 'shred', 'srm', 'wipe', 'remove', ]: # Check targets of delete commands for _, part in enumerate(parts[1:], 1): if part.startswith('-') or part.startswith( '/' ): # Skip options continue target = part.strip('"\'') if ( target.startswith('/') or target.startswith('\\') or ':' in target ): # Absolute path abs_path = os.path.abspath(target) else: # Relative path abs_path = os.path.abspath(os.path.join(exec_dir, target)) if not self._is_path_within_working_dir(abs_path): return False, ( f"Safety restriction: Cannot delete files outside " f"of working directory {self.working_dir}" ) # Check write/modify commands elif base_cmd in [ 'touch', 'mkdir', 'md', 'echo', 'cat', 'cp', 'copy', 'mv', 'move', 'rename', 'ren', 'write', 'output', ]: # Check for redirection symbols full_cmd = command.lower() if '>' in full_cmd: # Find the file path after redirection redirect_parts = command.split('>') if len(redirect_parts) > 1: output_file = ( redirect_parts[1].strip().split()[0].strip('"\'') ) if ( output_file.startswith('/') or output_file.startswith('\\') or ':' in output_file ): # Absolute path abs_path = os.path.abspath(output_file) else: # Relative path abs_path = os.path.abspath( os.path.join(exec_dir, output_file) ) if not self._is_path_within_working_dir(abs_path): return False, ( f"Safety restriction: Cannot write to file " f"outside of working directory {self.working_dir}" ) # For cp/mv commands, check target paths if base_cmd in ['cp', 'copy', 'mv', 'move']: # Simple handling, assuming the last parameter is the target if len(parts) > 2: target = parts[-1].strip('"\'') if ( target.startswith('/') or target.startswith('\\') or ':' in target ): # Absolute path abs_path = os.path.abspath(target) else: # Relative path abs_path = os.path.abspath( os.path.join(exec_dir, target) ) if not self._is_path_within_working_dir(abs_path): return False, ( f"Safety restriction: Cannot write to file " f"outside of working directory {self.working_dir}" ) # Check dangerous commands elif base_cmd in [ 'sudo', 'su', 'chmod', 'chown', 'chgrp', 'passwd', 'mkfs', 'fdisk', 'dd', 'shutdown', 'reboot', 'halt', 'poweroff', 'init', ]: return False, ( f"Safety restriction: Command '{base_cmd}' may affect system " f"security and is prohibited" ) # Check network commands elif base_cmd in ['ssh', 'telnet', 'ftp', 'sftp', 'nc', 'netcat']: return False, ( f"Safety restriction: Network command '{base_cmd}' " f"is prohibited" ) # Add copy functionality - copy from external to working directory elif base_cmd == 'safecopy': # Custom command: safecopy <source file> <target file> if len(parts) != 3: return False, "Usage: safecopy <source file> <target file>" source = parts[1].strip('\'"') target = parts[2].strip('\'"') # Check if source file exists if not os.path.exists(source): return False, f"Source file does not exist: {source}" # Ensure target is within working directory if ( target.startswith('/') or target.startswith('\\') or ':' in target ): # Absolute path abs_target = os.path.abspath(target) else: # Relative path abs_target = os.path.abspath(os.path.join(exec_dir, target)) if not self._is_path_within_working_dir(abs_target): return False, ( f"Safety restriction: Target file must be within " f"working directory {self.working_dir}" ) # Replace with safe copy command if self.os_type == 'Windows': return True, f"copy \"{source}\" \"{abs_target}\"" else: return True, f"cp \"{source}\" \"{abs_target}\"" return True, command
[docs] def shell_exec(self, id: str, command: str) -> str: r"""Execute commands. This can be used to execute various commands, such as writing code, executing code, and running commands. Args: id (str): Unique identifier of the target shell session. command (str): Shell command to execute. Returns: str: Output of the command execution or error message. """ # Command execution must be within the working directory error_msg = self._enforce_working_dir_for_execution(self.working_dir) if error_msg: return error_msg if self.safe_mode: is_safe, sanitized_command = self._sanitize_command( command, self.working_dir ) if not is_safe: return f"Command rejected: {sanitized_command}" command = sanitized_command # If the session does not exist, create a new session if id not in self.shell_sessions: self.shell_sessions[id] = { "process": None, "output": "", "running": False, } try: # First, log the command to be executed self._update_terminal_output(f"\n$ {command}\n") if command.startswith('python') or command.startswith('pip'): if self.cloned_env_path: if self.os_type == 'Windows': base_path = os.path.join( self.cloned_env_path, "Scripts" ) python_path = os.path.join(base_path, "python.exe") pip_path = os.path.join(base_path, "pip.exe") else: base_path = os.path.join(self.cloned_env_path, "bin") python_path = os.path.join(base_path, "python") pip_path = os.path.join(base_path, "pip") else: python_path = self.python_executable pip_path = f'"{python_path}" -m pip' if command.startswith('python'): command = command.replace('python', f'"{python_path}"', 1) elif command.startswith('pip'): command = command.replace('pip', pip_path, 1) if self.is_macos: # Type safe version - macOS uses subprocess.run process = subprocess.run( command, shell=True, cwd=self.working_dir, capture_output=True, text=True, env=os.environ.copy(), ) # Process the output output = process.stdout or "" if process.stderr: output += f"\nStderr Output:\n{process.stderr}" # Update session information and terminal self.shell_sessions[id]["output"] = output self._update_terminal_output(output + "\n") return output else: # Non-macOS systems use the Popen method proc = subprocess.Popen( command, shell=True, cwd=self.working_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True, env=os.environ.copy(), ) # Store the process and mark it as running self.shell_sessions[id]["process"] = proc self.shell_sessions[id]["running"] = True # Get output stdout, stderr = proc.communicate() output = stdout or "" if stderr: output += f"\nStderr Output:\n{stderr}" # Update session information and terminal self.shell_sessions[id]["output"] = output self._update_terminal_output(output + "\n") return output except Exception as e: error_msg = f"Command execution error: {e!s}" logger.error(error_msg) self._update_terminal_output(f"\nError: {error_msg}\n") # More detailed error information import traceback detailed_error = traceback.format_exc() return ( f"Error: {error_msg}\n\n" f"Detailed information: {detailed_error}" )
[docs] def shell_view(self, id: str) -> str: r"""View the content of a specified shell session. Args: id (str): Unique identifier of the target shell session. Returns: str: Current output content of the shell session. """ if id not in self.shell_sessions: return f"Shell session not found: {id}" session = self.shell_sessions[id] try: # Check process status if session["process"].poll() is not None: session["running"] = False # Collect all new output from agent queue new_output = "" try: while True: output = self.agent_queue.get_nowait() new_output += output session["output"] += output except queue.Empty: pass return new_output or session["output"] except Exception as e: error_msg = f"Error reading terminal output: {e}" self._update_terminal_output(f"\nError: {error_msg}\n") logger.error(error_msg) return f"Error: {e!s}"
[docs] def shell_wait(self, id: str, seconds: Optional[int] = None) -> str: r"""Wait for the running process in a specified shell session to return. Args: id (str): Unique identifier of the target shell session. seconds (Optional[int], optional): Wait duration in seconds. If None, wait indefinitely. Defaults to None. Returns: str: Final output content after waiting. """ if id not in self.shell_sessions: return f"Shell session not found: {id}" session = self.shell_sessions[id] process = session.get("process") if process is None: return f"No active process in session '{id}'" if not session["running"] or process.poll() is not None: return f"Process in session '{id}' is not running" try: if hasattr(process, 'communicate'): # Use communicate with timeout stdout, stderr = process.communicate(timeout=seconds) if stdout: stdout_str = ( stdout.decode('utf-8') if isinstance(stdout, bytes) else stdout ) session["output"] += stdout_str if stderr: stderr_str = ( stderr.decode('utf-8') if isinstance(stderr, bytes) else stderr ) if stderr_str: session["output"] += f"\nStderr Output:\n{stderr_str}" session["running"] = False return ( f"Process completed in session '{id}'. " f"Output: {session['output']}" ) else: return ( f"Process already completed in session '{id}'. " f"Output: {session['output']}" ) except subprocess.TimeoutExpired: return ( f"Process in session '{id}' is still running " f"after {seconds} seconds" ) except Exception as e: logger.error(f"Error waiting for process: {e}") return f"Error waiting for process: {e!s}"
[docs] def shell_write_to_process( self, id: str, input: str, press_enter: bool ) -> str: r"""Write input to a running process in a specified shell session. Args: id (str): Unique identifier of the target shell session. input (str): Input content to write to the process. press_enter (bool): Whether to press Enter key after input. Returns: str: Status message indicating whether the input was sent. """ if id not in self.shell_sessions: return f"Shell session not found: {id}" session = self.shell_sessions[id] process = session.get("process") if process is None: return f"No active process in session '{id}'" if not session["running"] or process.poll() is not None: return f"Process in session '{id}' is not running" try: if not process.stdin or process.stdin.closed: return ( f"Cannot write to process in session '{id}': " f"stdin is closed" ) if press_enter: input = input + "\n" # Write bytes to stdin process.stdin.write(input.encode('utf-8')) process.stdin.flush() return f"Input sent to process in session '{id}'" except Exception as e: logger.error(f"Error writing to process: {e}") return f"Error writing to process: {e!s}"
[docs] def shell_kill_process(self, id: str) -> str: r"""Terminate a running process in a specified shell session. Args: id (str): Unique identifier of the target shell session. Returns: str: Status message indicating whether the process was terminated. """ if id not in self.shell_sessions: return f"Shell session not found: {id}" session = self.shell_sessions[id] process = session.get("process") if process is None: return f"No active process in session '{id}'" if not session["running"] or process.poll() is not None: return f"Process in session '{id}' is not running" try: # Clean up process resources before termination if process.stdin and not process.stdin.closed: process.stdin.close() process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: logger.warning( f"Process in session '{id}' did not terminate gracefully" f", forcing kill" ) process.kill() session["running"] = False return f"Process in session '{id}' has been terminated" except Exception as e: logger.error(f"Error killing process: {e}") return f"Error killing process: {e!s}"
def __del__(self): r"""Clean up resources when the object is being destroyed. Terminates all running processes and closes any open file handles. """ # Log that cleanup is starting logger.info("TerminalToolkit cleanup initiated") # Clean up all processes in shell sessions for session_id, session in self.shell_sessions.items(): process = session.get("process") if process is not None and session.get("running", False): try: logger.info( f"Terminating process in session '{session_id}'" ) # Close process input/output streams if open if ( hasattr(process, 'stdin') and process.stdin and not process.stdin.closed ): process.stdin.close() # Terminate the process process.terminate() try: # Give the process a short time to terminate gracefully process.wait(timeout=3) except subprocess.TimeoutExpired: # Force kill if the process doesn't terminate # gracefully logger.warning( f"Process in session '{session_id}' did not " f"terminate gracefully, forcing kill" ) process.kill() # Mark the session as not running session["running"] = False except Exception as e: logger.error( f"Error cleaning up process in session " f"'{session_id}': {e}" ) # Close file output if it exists if hasattr(self, 'log_file') and self.is_macos: try: logger.info(f"Final terminal log saved to: {self.log_file}") except Exception as e: logger.error(f"Error logging file information: {e}") # Clean up GUI resources if they exist if hasattr(self, 'root') and self.root: try: logger.info("Closing terminal GUI") self.root.quit() self.root.destroy() except Exception as e: logger.error(f"Error closing terminal GUI: {e}") logger.info("TerminalToolkit cleanup completed")
[docs] def get_tools(self) -> List[FunctionTool]: r"""Returns a list of FunctionTool objects representing the functions in the toolkit. Returns: List[FunctionTool]: A list of FunctionTool objects representing the functions in the toolkit. """ return [ FunctionTool(self.file_find_in_content), FunctionTool(self.file_find_by_name), FunctionTool(self.shell_exec), FunctionTool(self.shell_view), FunctionTool(self.shell_wait), FunctionTool(self.shell_write_to_process), FunctionTool(self.shell_kill_process), ]