# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import os
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import Any, ClassVar, Dict, List
from colorama import Fore
from camel.interpreters.base import BaseInterpreter
from camel.interpreters.interpreter_error import InterpreterError
from camel.logger import get_logger
logger = get_logger(__name__)
[docs]
class SubprocessInterpreter(BaseInterpreter):
r"""SubprocessInterpreter is a class for executing code files or code
strings in a subprocess.
This class handles the execution of code in different scripting languages
(currently Python and Bash) within a subprocess, capturing their
stdout and stderr streams, and allowing user checking before executing code
strings.
Args:
require_confirm (bool, optional): If True, prompt user before running
code strings for security. (default: :obj:`True`)
print_stdout (bool, optional): If True, print the standard output of
the executed code. (default: :obj:`False`)
print_stderr (bool, optional): If True, print the standard error of the
executed code. (default: :obj:`True`)
execution_timeout (int, optional): Maximum time in seconds to wait for
code execution to complete. (default: :obj:`60`)
"""
_CODE_EXECUTE_CMD_MAPPING: ClassVar[Dict[str, Dict[str, str]]] = {
"python": {"posix": "python {file_name}", "nt": "python {file_name}"},
"bash": {"posix": "bash {file_name}", "nt": "bash {file_name}"},
"r": {"posix": "Rscript {file_name}", "nt": "Rscript {file_name}"},
}
_CODE_EXTENSION_MAPPING: ClassVar[Dict[str, str]] = {
"python": "py",
"bash": "sh",
"r": "R",
}
_CODE_TYPE_MAPPING: ClassVar[Dict[str, str]] = {
"python": "python",
"py3": "python",
"python3": "python",
"py": "python",
"shell": "bash",
"bash": "bash",
"sh": "bash",
"r": "r",
"R": "r",
}
def __init__(
self,
require_confirm: bool = True,
print_stdout: bool = False,
print_stderr: bool = True,
execution_timeout: int = 60,
) -> None:
self.require_confirm = require_confirm
self.print_stdout = print_stdout
self.print_stderr = print_stderr
self.execution_timeout = execution_timeout
[docs]
def run_file(
self,
file: Path,
code_type: str,
) -> str:
r"""Executes a code file in a subprocess and captures its output.
Args:
file (Path): The path object of the file to run.
code_type (str): The type of code to execute (e.g., 'python',
'bash').
Returns:
str: A string containing the captured stdout and stderr of the
executed code.
"""
if not file.is_file():
return f"{file} is not a file."
code_type = self._check_code_type(code_type)
if self._CODE_TYPE_MAPPING[code_type] == "python":
# For Python code, use ast to analyze and modify the code
import ast
import astor
with open(file, 'r', encoding='utf-8') as f:
source = f.read()
# Parse the source code
try:
tree = ast.parse(source)
# Get the last node
if tree.body:
last_node = tree.body[-1]
# Handle expressions that would normally not produce output
# For example: In a REPL, typing '1 + 2' should show '3'
if isinstance(last_node, ast.Expr):
# Only wrap in print(repr()) if it's not already a
# print call
if not (
isinstance(last_node.value, ast.Call)
and isinstance(last_node.value.func, ast.Name)
and last_node.value.func.id == 'print'
):
# Transform the AST to wrap the expression in print
# (repr())
# Example transformation:
# Before: x + y
# After: print(repr(x + y))
tree.body[-1] = ast.Expr(
value=ast.Call(
# Create print() function call
func=ast.Name(id='print', ctx=ast.Load()),
args=[
ast.Call(
# Create repr() function call
func=ast.Name(
id='repr', ctx=ast.Load()
),
# Pass the original expression as
# argument to repr()
args=[last_node.value],
keywords=[],
)
],
keywords=[],
)
)
# Fix missing source locations
ast.fix_missing_locations(tree)
# Convert back to source
modified_source = astor.to_source(tree)
# Create a temporary file with the modified source
temp_file = self._create_temp_file(modified_source, "py")
cmd = ["python", str(temp_file)]
except (SyntaxError, TypeError, ValueError) as e:
logger.warning(f"Failed to parse Python code with AST: {e}")
platform_type = 'posix' if os.name != 'nt' else 'nt'
cmd_template = self._CODE_EXECUTE_CMD_MAPPING[code_type][
platform_type
]
base_cmd = cmd_template.split()[0]
# Check if command is available
if not self._is_command_available(base_cmd):
raise InterpreterError(
f"Command '{base_cmd}' not found. Please ensure it "
f"is installed and available in your PATH."
)
cmd = [base_cmd, str(file)]
else:
# For non-Python code, use standard execution
platform_type = 'posix' if os.name != 'nt' else 'nt'
cmd_template = self._CODE_EXECUTE_CMD_MAPPING[code_type][
platform_type
]
base_cmd = cmd_template.split()[0] # Get 'python', 'bash', etc.
# Check if command is available
if not self._is_command_available(base_cmd):
raise InterpreterError(
f"Command '{base_cmd}' not found. Please ensure it "
f"is installed and available in your PATH."
)
cmd = [base_cmd, str(file)]
# Get current Python executable's environment
env = os.environ.copy()
# On Windows, ensure we use the correct Python executable path
if os.name == 'nt':
python_path = os.path.dirname(sys.executable)
if 'PATH' in env:
env['PATH'] = python_path + os.pathsep + env['PATH']
else:
env['PATH'] = python_path
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=env,
shell=False, # Never use shell=True for security
)
# Add timeout to prevent hanging processes
stdout, stderr = proc.communicate(timeout=self.execution_timeout)
return_code = proc.returncode
except subprocess.TimeoutExpired:
proc.kill()
stdout, stderr = proc.communicate()
return_code = proc.returncode
timeout_msg = (
f"Process timed out after {self.execution_timeout} seconds "
f"and was terminated."
)
stderr = f"{stderr}\n{timeout_msg}"
# Clean up temporary file if it was created
temp_file_to_clean = locals().get('temp_file')
if temp_file_to_clean is not None:
try:
if temp_file_to_clean.exists():
try:
temp_file_to_clean.unlink()
except PermissionError:
# On Windows, files might be locked
logger.warning(
f"Could not delete temp file "
f"{temp_file_to_clean} (may be locked)"
)
except Exception as e:
logger.warning(f"Failed to cleanup temporary file: {e}")
if self.print_stdout and stdout:
print("======stdout======")
print(Fore.GREEN + stdout + Fore.RESET)
print("==================")
if self.print_stderr and stderr:
print("======stderr======")
print(Fore.RED + stderr + Fore.RESET)
print("==================")
# Build the execution result
exec_result = ""
if stdout:
exec_result += stdout
if stderr:
exec_result += f"(stderr: {stderr})"
if return_code != 0:
error_msg = f"(Execution failed with return code {return_code})"
if not stderr:
exec_result += error_msg
elif error_msg not in stderr:
exec_result += error_msg
return exec_result
[docs]
def run(
self,
code: str,
code_type: str,
) -> str:
r"""Generates a temporary file with the given code, executes it, and
deletes the file afterward.
Args:
code (str): The code string to execute.
code_type (str): The type of code to execute (e.g., 'python',
'bash').
Returns:
str: A string containing the captured stdout and stderr of the
executed code.
Raises:
InterpreterError: If the user declines to run the code or if the
code type is unsupported.
"""
code_type = self._check_code_type(code_type)
# Print code for security checking
if self.require_confirm:
logger.info(
f"The following {code_type} code will run on your "
f"computer: {code}"
)
while True:
choice = input("Running code? [Y/n]:").lower().strip()
if choice in ["y", "yes", "ye", ""]:
break
elif choice in ["no", "n"]:
raise InterpreterError(
"Execution halted: User opted not to run the code. "
"This choice stops the current operation and any "
"further code execution."
)
else:
print("Please enter 'y' or 'n'.")
temp_file_path = None
temp_dir = None
try:
temp_file_path = self._create_temp_file(
code=code, extension=self._CODE_EXTENSION_MAPPING[code_type]
)
temp_dir = temp_file_path.parent
return self.run_file(temp_file_path, code_type)
finally:
# Clean up temp file and directory
try:
if temp_file_path and temp_file_path.exists():
try:
temp_file_path.unlink()
except PermissionError:
# On Windows, files might be locked
logger.warning(
f"Could not delete temp file {temp_file_path}"
)
if temp_dir and temp_dir.exists():
try:
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
logger.warning(f"Could not delete temp directory: {e}")
except Exception as e:
logger.warning(f"Error during cleanup: {e}")
def _create_temp_file(self, code: str, extension: str) -> Path:
r"""Creates a temporary file with the given code and extension.
Args:
code (str): The code to write to the temporary file.
extension (str): The file extension to use.
Returns:
Path: The path to the created temporary file.
"""
try:
# Create a temporary directory first to ensure we have write
# permissions
temp_dir = tempfile.mkdtemp()
# Create file path with appropriate extension
file_path = Path(temp_dir) / f"temp_code.{extension}"
# Write code to file with appropriate encoding
with open(file_path, 'w', encoding='utf-8') as f:
f.write(code)
return file_path
except Exception as e:
# Clean up temp directory if creation failed
if 'temp_dir' in locals():
try:
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception:
pass
logger.error(f"Failed to create temporary file: {e}")
raise
def _check_code_type(self, code_type: str) -> str:
if code_type not in self._CODE_TYPE_MAPPING:
raise InterpreterError(
f"Unsupported code type {code_type}. Currently "
f"`{self.__class__.__name__}` only supports "
f"{', '.join(self._CODE_EXTENSION_MAPPING.keys())}."
)
return self._CODE_TYPE_MAPPING[code_type]
[docs]
def supported_code_types(self) -> List[str]:
r"""Provides supported code types by the interpreter."""
return list(self._CODE_EXTENSION_MAPPING.keys())
[docs]
def update_action_space(self, action_space: Dict[str, Any]) -> None:
r"""Updates action space for *python* interpreter"""
raise RuntimeError(
"SubprocessInterpreter doesn't support " "`action_space`."
)
def _is_command_available(self, command: str) -> bool:
r"""Check if a command is available in the system PATH.
Args:
command (str): The command to check.
Returns:
bool: True if the command is available, False otherwise.
"""
if os.name == 'nt': # Windows
# On Windows, use where.exe to find the command
try:
with open(os.devnull, 'w') as devnull:
subprocess.check_call(
['where', command],
stdout=devnull,
stderr=devnull,
shell=False,
)
return True
except subprocess.CalledProcessError:
return False
else: # Unix-like systems
# On Unix-like systems, use which to find the command
try:
with open(os.devnull, 'w') as devnull:
subprocess.check_call(
['which', command],
stdout=devnull,
stderr=devnull,
shell=False,
)
return True
except subprocess.CalledProcessError:
return False