# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import asyncio
import time
from abc import ABC, abstractmethod
from typing import List, Optional
from camel.extractors.base import BaseExtractor
from camel.logger import get_logger
from camel.utils import BatchProcessor
from .models import VerificationOutcome, VerificationResult
logger = get_logger(__name__)
[docs]
class BaseVerifier(ABC):
r"""Base class for all verifiers.
Example:
```python
verifier = MyVerifier()
await verifier.setup()
result = await verifier.verify(response)
await verifier.cleanup()
```
Key Features:
- Async verification with retry logic
- Comprehensive error handling and logging
- Configurable batch processing
- Resource monitoring for adaptive scaling
"""
def __init__(
self,
extractor: Optional[BaseExtractor] = None,
max_parallel: Optional[int] = None,
timeout: Optional[float] = None,
max_retries: int = 3,
retry_delay: float = 1.0,
initial_batch_size: Optional[int] = None,
cpu_threshold: float = 80.0,
memory_threshold: float = 85.0,
**kwargs,
):
r"""Initialize the verifier with configuration parameters.
Args:
max_parallel: Maximum number of parallel verifications. If None,
determined dynamically based on system resources.
(default: :obj:`None`)
timeout: Timeout in seconds for each verification. (default:
:obj:`None`)
max_retries: Maximum number of retry attempts. (default: :obj:`3`)
retry_delay: Delay between retries in seconds. (default:
:obj:`1.0`)
initial_batch_size: Initial size for batch processing. If None,
defaults to 10. (default: :obj:`None`)
cpu_threshold: CPU usage percentage threshold for scaling down.
(default: :obj:`80.0`)
memory_threshold: Memory usage percentage threshold for scaling
down. (default: :obj:`85.0`)
**kwargs: Additional verifier parameters.
"""
self.extractor = extractor
self._is_setup: bool = False
self._max_parallel: Optional[int] = max_parallel
self._timeout: Optional[float] = timeout
self._max_retries: int = max_retries
self._retry_delay: float = retry_delay
self._initial_batch_size: Optional[int] = initial_batch_size
self._cpu_threshold: float = cpu_threshold
self._memory_threshold: float = memory_threshold
self._batch_processor: BatchProcessor = BatchProcessor()
[docs]
async def setup(self, **kwargs) -> None:
r"""Set up the verifier with necessary resources.
Initializes:
1. Batch processor with validated parameters
2. Any verifier-specific resources
Raises:
RuntimeError: If setup fails or resources cannot be initialized.
"""
if self._is_setup:
logger.debug(f"{self.__class__.__name__} already initialized")
return
try:
if self.extractor:
await self.extractor.setup()
batch_size = max(1, self._initial_batch_size or 10)
max_parallel = max(1, self._max_parallel or 1)
self._batch_processor = BatchProcessor()
logger.info(
f"{self.__class__.__name__} initialized with "
f"batch_size={batch_size}, max_parallel={max_parallel}"
)
await self._setup(**kwargs)
self._is_setup = True
except Exception as e:
error_msg = (
f"Failed to initialize {self.__class__.__name__}: {e!s}"
)
logger.error(error_msg, exc_info=True)
await self.cleanup()
raise RuntimeError(error_msg) from e
@abstractmethod
async def _setup(self, **kwargs) -> None:
r"""Implement verifier-specific setup logic."""
pass
[docs]
async def cleanup(self) -> None:
r"""Clean up verifier resources.
Ensures:
1. Batch processor is reset
2. All internal states are cleared
Raises:
RuntimeError: If cleanup fails.
"""
if not self._is_setup:
return
try:
if self.extractor:
await self.extractor.cleanup()
self._batch_processor = BatchProcessor()
await self._cleanup()
logger.info(f"{self.__class__.__name__} cleaned up successfully")
except Exception as e:
error_msg = f"Failed to cleanup {self.__class__.__name__}: {e!s}"
logger.error(error_msg, exc_info=True)
raise RuntimeError(error_msg) from e
finally:
self._is_setup = False
@abstractmethod
async def _cleanup(self) -> None:
r"""Implement verifier-specific cleanup logic."""
pass
[docs]
async def verify(
self, solution: str, ground_truth: Optional[str]
) -> VerificationResult:
r"""Perform verification with full error handling.
This method verifies the correctness of a generated solution by
comparing it against the provided ground truth. It handles
execution errors, timeouts, and retry attempts to ensure robust
validation.
Args:
solution (str): The generated response that needs verification.
ground_truth (Optional[str]): The expected correct answer to
compare against.
Returns:
VerificationResult: A structured object containing:
- status (SUCCESS/FAILURE/ERROR/TIMEOUT)
- result (str): The verification outcome or processed output.
- duration (float): Time taken for verification.
- metadata (dict): Additional details such as retry attempts.
- error_message (Optional[str]): Error description,
if applicable.
Raises:
RuntimeError: If verification fails unexpectedly.
asyncio.TimeoutError: If verification exceeds the time limit.
"""
if not self._is_setup:
logger.warning(
f"{self.__class__.__name__} not set up, calling setup()"
)
await self.setup()
attempt = 0
start_time = time.time()
while attempt < self._max_retries:
# Extract verifiable part of the proposed solution,
# if verifier has been initialized with extractor.
verifiable_solution = (
await self.extractor.extract(solution)
if self.extractor
else solution
)
if not verifiable_solution:
continue
try:
verification_result = (
await asyncio.wait_for(
self._verify_implementation(
verifiable_solution, ground_truth
),
timeout=self._timeout,
)
if self._timeout
else await self._verify_implementation(
verifiable_solution, ground_truth
)
)
verification_result.duration = time.time() - start_time
verification_result.metadata["attempt"] = attempt + 1
return verification_result
except asyncio.TimeoutError:
attempt += 1
if attempt == self._max_retries:
return VerificationResult(
status=VerificationOutcome.TIMEOUT,
result="",
error_message="Verification timed out "
"after all retries.",
duration=time.time() - start_time,
metadata={"attempt": attempt},
)
logger.warning(
f"Verification timeout on attempt {attempt}, retrying..."
)
await asyncio.sleep(self._retry_delay)
except Exception as e:
attempt += 1
if attempt == self._max_retries:
return VerificationResult(
status=VerificationOutcome.ERROR,
result="",
error_message=f"Verification failed: {e!s}",
duration=time.time() - start_time,
metadata={"attempt": attempt},
)
await asyncio.sleep(self._retry_delay)
return VerificationResult(
status=VerificationOutcome.ERROR,
result="",
error_message="Unexpected code path reached",
duration=time.time() - start_time,
metadata={"attempt": attempt},
)
@abstractmethod
async def _verify_implementation(
self, solution: str, ground_truth: Optional[str]
) -> VerificationResult:
r"""Abstract method for verification logic.
Subclasses must implement this method to define how the solution
should be processed, evaluated, and compared to the ground truth.
Args:
solution (str): The generated response requiring verification.
ground_truth (Optional[str]): The expected reference output.
Returns:
VerificationResult: Contains verification status and details.
Raises:
NotImplementedError: If the method is not implemented
in a subclass.
"""
raise NotImplementedError(
"Subclasses must implement _verify_implementation()"
)
# TODO: check again
[docs]
async def verify_batch(
self,
solutions: List[str],
ground_truths: List[Optional[str]],
raise_on_error: bool = False,
) -> List[VerificationResult]:
r"""Verify multiple solutions in parallel with controlled concurrency.
This method verifies multiple generated solutions against their
respective ground truths using parallel execution. It handles
timeouts, execution errors, and batch processing optimizations.
Args:
solutions (List[str]): A list of generated solutions to be
verified.
ground_truths (List[Optional[str]]): A list of expected outputs for
comparison. Each element corresponds to a solution.
raise_on_error (bool, optional): If True, raises an exception if
any verification fails. (default: :obj:`False`)
Returns:
List[VerificationResult]: A list of verification results, one per
input solution.
Raises:
RuntimeError: If any verification fails and `raise_on_error` is
True.
asyncio.TimeoutError: If verifications time out after maximum
retries.
"""
if not self._is_setup:
logger.warning(
f"{self.__class__.__name__} not set up, calling setup()"
)
await self.setup()
# Retrieve batch processing settings
max_workers = getattr(
self._batch_processor, 'max_workers', self._max_parallel or 1
)
batch_size = getattr(
self._batch_processor, 'batch_size', self._initial_batch_size or 10
)
semaphore = asyncio.Semaphore(max(1, max_workers))
async def _verify_with_semaphore(
solution: str, ground_truth: Optional[str]
) -> VerificationResult:
start_time = time.time()
try:
async with semaphore:
verification_result = await self.verify(
solution, ground_truth
)
processing_time = time.time() - start_time
success = (
verification_result.status == VerificationOutcome.SUCCESS
)
self._batch_processor.adjust_batch_size(
success, processing_time
)
return verification_result
except Exception as e:
processing_time = time.time() - start_time
self._batch_processor.adjust_batch_size(False, processing_time)
logger.error(f"Verification failed: {e!s}", exc_info=True)
return VerificationResult(
status=VerificationOutcome.ERROR,
result="",
error_message=str(e),
metadata={"error_type": type(e).__name__},
)
# Process in batches
all_results: List[VerificationResult] = []
for i in range(0, len(solutions), batch_size):
batch_solutions = solutions[i : i + batch_size]
batch_ground_truths = ground_truths[i : i + batch_size]
verification_tasks = [
_verify_with_semaphore(solution, ground_truth)
for solution, ground_truth in zip(
batch_solutions, batch_ground_truths
)
]
try:
batch_results = await asyncio.gather(*verification_tasks)
all_results.extend(batch_results)
except Exception as e:
logger.error(
f"Batch verification failed: {e!s}", exc_info=True
)
if raise_on_error:
raise RuntimeError(
f"Batch verification failed: {e!s}"
) from e
if raise_on_error and any(
r.status
in {VerificationOutcome.ERROR, VerificationOutcome.TIMEOUT}
for r in all_results
):
error_msg = "One or more verifications failed"
logger.error(error_msg)
raise RuntimeError(error_msg)
return all_results