Source code for camel.toolkits.terminal_toolkit

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import os
import subprocess
from typing import Any, Dict, List, Optional

from camel.logger import get_logger
from camel.toolkits.base import BaseToolkit
from camel.toolkits.function_tool import FunctionTool

logger = get_logger(__name__)


[docs] class TerminalToolkit(BaseToolkit): r"""A toolkit for terminal operations across multiple operating systems. This toolkit provides a set of functions for terminal operations such as searching for files by name or content, executing shell commands, and managing terminal sessions. Args: timeout (Optional[float]): The timeout for terminal operations. shell_sessions (Optional[Dict[str, Any]]): A dictionary to store shell session information. If None, an empty dictionary will be used. Note: Most functions are compatible with Unix-based systems (macOS, Linux). For Windows compatibility, additional implementation details are needed. """ def __init__( self, timeout: Optional[float] = None, shell_sessions: Optional[Dict[str, Any]] = None, ): import platform super().__init__(timeout=timeout) self.shell_sessions = shell_sessions or {} self.os_type = ( platform.system() ) # 'Windows', 'Darwin' (macOS), 'Linux'
[docs] def file_find_in_content( self, file: str, regex: str, sudo: bool = False ) -> str: r"""Search for matching text within file content. Args: file (str): Absolute path of the file to search within. regex (str): Regular expression pattern to match. sudo (bool, optional): Whether to use sudo privileges. Defaults to False. Note: Using sudo requires the process to have appropriate permissions. Returns: str: Matching content found in the file. """ if not os.path.exists(file): return f"File not found: {file}" if not os.path.isfile(file): return f"The path provided is not a file: {file}" command = [] if sudo: command.extend(["sudo"]) if self.os_type in ['Darwin', 'Linux']: # macOS or Linux command.extend(["grep", "-E", regex, file]) else: # Windows # For Windows, we could use PowerShell or findstr command.extend(["findstr", "/R", regex, file]) try: result = subprocess.run( command, check=False, capture_output=True, text=True ) return result.stdout.strip() except subprocess.SubprocessError as e: logger.error(f"Error searching in file content: {e}") return f"Error: {e!s}"
[docs] def file_find_by_name(self, path: str, glob: str) -> str: r"""Find files by name pattern in specified directory. Args: path (str): Absolute path of directory to search. glob (str): Filename pattern using glob syntax wildcards. Returns: str: List of files matching the pattern. """ if not os.path.exists(path): return f"Directory not found: {path}" if not os.path.isdir(path): return f"The path provided is not a directory: {path}" command = [] if self.os_type in ['Darwin', 'Linux']: # macOS or Linux command.extend(["find", path, "-name", glob]) else: # Windows # For Windows, we use dir command with /s for recursive search # and /b for bare format pattern = glob file_path = os.path.join(path, pattern).replace('/', '\\') command.extend(["cmd", "/c", "dir", "/s", "/b", file_path]) try: result = subprocess.run( command, check=False, capture_output=True, text=True ) return result.stdout.strip() except subprocess.SubprocessError as e: logger.error(f"Error finding files by name: {e}") return f"Error: {e!s}"
[docs] def shell_exec(self, id: str, exec_dir: str, command: str) -> str: r"""Execute commands in a specified shell session. Args: id (str): Unique identifier of the target shell session. exec_dir (str): Working directory for command execution (must use absolute path). command (str): Shell command to execute. Returns: str: Output of the command execution or error message. """ if not os.path.isabs(exec_dir): return f"exec_dir must be an absolute path: {exec_dir}" if not os.path.exists(exec_dir): return f"Directory not found: {exec_dir}" # If the session doesn't exist, create a new one if id not in self.shell_sessions: self.shell_sessions[id] = { "process": None, "output": "", "running": False, } try: # Execute the command in the specified directory process = subprocess.Popen( command, shell=True, cwd=exec_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, text=False, ) # Store the process and mark as running self.shell_sessions[id]["process"] = process self.shell_sessions[id]["running"] = True self.shell_sessions[id]["output"] = "" # Get initial output (non-blocking) stdout, stderr = "", "" try: if process.stdout: stdout = process.stdout.read().decode('utf-8') if process.stderr: stderr = process.stderr.read().decode('utf-8') except Exception as e: logger.error(f"Error reading initial output: {e}") return f"Error: {e!s}" output = stdout if stderr: output += f"\nErrors:\n{stderr}" self.shell_sessions[id]["output"] = output return ( f"Command started in session '{id}'. Initial output: {output}" ) except subprocess.SubprocessError as e: self.shell_sessions[id]["running"] = False error_msg = f"Error executing command: {e}" self.shell_sessions[id]["output"] = error_msg logger.error(error_msg) return error_msg
[docs] def shell_view(self, id: str) -> str: r"""View the content of a specified shell session. Args: id (str): Unique identifier of the target shell session. Returns: str: Current output content of the shell session. """ if id not in self.shell_sessions: return f"Shell session not found: {id}" session = self.shell_sessions[id] process = session.get("process") if process is None: return f"No active process in session '{id}'" # Try to get any new output if session["running"] and process.poll() is None: try: # Non-blocking read from stdout/stderr stdout_data, stderr_data = "", "" if process.stdout and process.stdout.readable(): stdout_data = process.stdout.read1().decode('utf-8') if process.stderr and process.stderr.readable(): stderr_data = process.stderr.read1().decode('utf-8') if stdout_data: session["output"] += stdout_data if stderr_data: session["output"] += f"\nErrors:\n{stderr_data}" except Exception as e: logger.error(f"Error getting process output: {e}") return f"Error: {e!s}" # Check if the process has completed if process.poll() is not None and session["running"]: try: # Get remaining output if any stdout_data, stderr_data = "", "" if process.stdout and process.stdout.readable(): stdout_data = process.stdout.read().decode('utf-8') if process.stderr and process.stderr.readable(): stderr_data = process.stderr.read().decode('utf-8') if stdout_data: session["output"] += stdout_data if stderr_data: session["output"] += f"\nErrors:\n{stderr_data}" except Exception as e: logger.error(f"Error getting final process output: {e}") return f"Error: {e!s}" finally: session["running"] = False return session["output"]
[docs] def shell_wait(self, id: str, seconds: Optional[int] = None) -> str: r"""Wait for the running process in a specified shell session to return. Args: id (str): Unique identifier of the target shell session. seconds (Optional[int], optional): Wait duration in seconds. If None, wait indefinitely. Defaults to None. Returns: str: Final output content after waiting. """ if id not in self.shell_sessions: return f"Shell session not found: {id}" session = self.shell_sessions[id] process = session.get("process") if process is None: return f"No active process in session '{id}'" if not session["running"]: return f"Process in session '{id}' is not running" try: # Use communicate with timeout stdout, stderr = process.communicate(timeout=seconds) if stdout: stdout_str = ( stdout.decode('utf-8') if isinstance(stdout, bytes) else stdout ) session["output"] += stdout_str if stderr: stderr_str = ( stderr.decode('utf-8') if isinstance(stderr, bytes) else stderr ) session["output"] += f"\nErrors:\n{stderr_str}" session["running"] = False return ( f"Process completed in session '{id}'. " f"Output: {session['output']}" ) except subprocess.TimeoutExpired: return ( f"Process in session '{id}' is still running " f"after {seconds} seconds" ) except Exception as e: logger.error(f"Error waiting for process: {e}") return f"Error waiting for process: {e!s}"
[docs] def shell_write_to_process( self, id: str, input: str, press_enter: bool ) -> str: r"""Write input to a running process in a specified shell session. Args: id (str): Unique identifier of the target shell session. input (str): Input content to write to the process. press_enter (bool): Whether to press Enter key after input. Returns: str: Status message indicating whether the input was sent. """ if id not in self.shell_sessions: return f"Shell session not found: {id}" session = self.shell_sessions[id] process = session.get("process") if process is None: return f"No active process in session '{id}'" if not session["running"] or process.poll() is not None: return f"Process in session '{id}' is not running" try: if not process.stdin or process.stdin.closed: return ( f"Cannot write to process in session '{id}': " f"stdin is closed" ) if press_enter: input = input + "\n" # Write bytes to stdin process.stdin.write(input.encode('utf-8')) process.stdin.flush() return f"Input sent to process in session '{id}'" except Exception as e: logger.error(f"Error writing to process: {e}") return f"Error writing to process: {e!s}"
[docs] def shell_kill_process(self, id: str) -> str: r"""Terminate a running process in a specified shell session. Args: id (str): Unique identifier of the target shell session. Returns: str: Status message indicating whether the process was terminated. """ if id not in self.shell_sessions: return f"Shell session not found: {id}" session = self.shell_sessions[id] process = session.get("process") if process is None: return f"No active process in session '{id}'" if not session["running"] or process.poll() is not None: return f"Process in session '{id}' is not running" try: # Clean up process resources before termination if process.stdin and not process.stdin.closed: process.stdin.close() process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: logger.warning( f"Process in session '{id}' did not terminate gracefully" f", forcing kill" ) process.kill() session["running"] = False return f"Process in session '{id}' has been terminated" except Exception as e: logger.error(f"Error killing process: {e}") return f"Error killing process: {e!s}"
[docs] def get_tools(self) -> List[FunctionTool]: r"""Returns a list of FunctionTool objects representing the functions in the toolkit. Returns: List[FunctionTool]: A list of FunctionTool objects representing the functions in the toolkit. """ return [ FunctionTool(self.file_find_in_content), FunctionTool(self.file_find_by_name), FunctionTool(self.shell_exec), FunctionTool(self.shell_view), FunctionTool(self.shell_wait), FunctionTool(self.shell_write_to_process), FunctionTool(self.shell_kill_process), ]