Source code for camel.extractors.base

# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import asyncio
from abc import ABC, abstractmethod
from types import TracebackType
from typing import Any, Dict, List, Optional, Type

from camel.logger import get_logger
from camel.utils import BatchProcessor

logger = get_logger(__name__)


[docs] class BaseExtractorStrategy(ABC): r"""Abstract base class for extraction strategies."""
[docs] @abstractmethod async def extract(self, text: str) -> Optional[str]: r"""Asynchronously extracts relevant parts from text. Args: text (str): The input text to process. Returns: Optional[str]: Extracted str if successful, otherwise None. """ pass
[docs] class BaseExtractor: r"""Base class for response extractors with a fixed strategy pipeline. This extractor: - Uses a **fixed multi-stage pipeline** of extraction strategies. - Tries **each strategy in order** within a stage until one succeeds. - Feeds the **output of one stage into the next** for processing. - Supports **async execution** for efficient processing. - Provides **batch processing and resource monitoring** options. """ def __init__( self, pipeline: List[List[BaseExtractorStrategy]], cache_templates: bool = True, max_cache_size: int = 1000, extraction_timeout: float = 30.0, batch_size: int = 10, monitoring_interval: float = 5.0, cpu_threshold: float = 80.0, memory_threshold: float = 85.0, **kwargs, ): r"""Initialize the extractor with a multi-stage strategy pipeline. Args: pipeline (List[List[BaseExtractorStrategy]]): A fixed list of lists where each list represents a stage containing extractor strategies executed in order. cache_templates (bool): Whether to cache extraction templates. (default: :obj:`True`) max_cache_size (int): Maximum number of templates to cache. (default: :obj:`1000`) extraction_timeout (float): Maximum time for extraction in seconds. (default: :obj:`30.0`) batch_size (int): Size of batches for parallel extraction. (default: :obj:`10`) monitoring_interval (float): Interval in seconds between resource checks. (default: :obj:`5.0`) cpu_threshold (float): CPU usage percentage threshold for scaling down. (default: :obj:`80.0`) memory_threshold (float): Memory usage percentage threshold for scaling down. (default: :obj:`85.0`) **kwargs: Additional extractor parameters. """ self._metadata = { 'cache_templates': cache_templates, 'max_cache_size': max_cache_size, 'extraction_timeout': extraction_timeout, 'batch_size': batch_size, 'monitoring_interval': monitoring_interval, 'cpu_threshold': cpu_threshold, 'memory_threshold': memory_threshold, **kwargs, } self._is_setup = False self._cache: Dict[str, Any] = {} self._batch_processor: Optional[BatchProcessor] = None self._pipeline = pipeline
[docs] async def setup(self) -> None: r"""Set up the extractor with necessary resources. This method: 1. Initializes template cache if enabled 2. Sets up any parallel processing resources 3. Validates extraction patterns Raises: RuntimeError: If initialization fails """ if self._is_setup: logger.debug(f"{self.__class__.__name__} already initialized") return try: if self._metadata["cache_templates"]: self._template_cache: Dict[str, Any] = {} if self._metadata["batch_size"] > 1: self._batch_processor = BatchProcessor( initial_batch_size=self._metadata["batch_size"], monitoring_interval=self._metadata["monitoring_interval"], cpu_threshold=self._metadata["cpu_threshold"], memory_threshold=self._metadata["memory_threshold"], ) self._is_setup = True logger.info(f"{self.__class__.__name__} initialized successfully") except Exception as e: error_msg = f"Error during {self.__class__.__name__} setup: {e}" logger.error(error_msg) await self.cleanup() raise RuntimeError(error_msg) from e
[docs] async def cleanup(self) -> None: r"""Clean up extractor resources. This method handles cleanup of resources and resets the extractor state. It ensures: 1. All resources are properly released 2. Template cache is cleared 3. Parallel processing resources are shutdown 4. State is reset to initial 5. Cleanup happens even if errors occur Raises: RuntimeError: If cleanup fails (after resetting initialization state). """ if not self._is_setup: logger.debug( f"{self.__class__.__name__} not initialized, skipping cleanup" ) return errors = [] try: # Clear template cache if hasattr(self, '_template_cache'): try: self._template_cache.clear() except Exception as e: errors.append(f"Failed to clear template cache: {e}") # Shutdown parallel processing if self._batch_processor is not None: try: # Get final performance metrics before cleanup metrics = self._batch_processor.get_performance_metrics() logger.info(f"Batch processor final metrics: {metrics}") except Exception as e: errors.append( f"Failed to get batch processor metrics: {e}" ) # Preserve init config in metadata if not errors: logger.info( f"{self.__class__.__name__} cleaned up successfully" ) except Exception as e: errors.append(f"Unexpected error during cleanup: {e}") finally: self._is_setup = False self._batch_processor = None if errors: error_msg = f"Errors during cleanup: {'; '.join(errors)}" logger.error(error_msg) raise RuntimeError(error_msg)
async def __aenter__(self) -> "BaseExtractor": r"""Async context manager entry. Returns: BaseExtractor: The initialized extractor instance. """ await self.setup() return self async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: r"""Async context manager exit. Args: exc_type (Optional[Type[BaseException]]): Exception type if an error occurred. exc_val (Optional[BaseException]): Exception value if an error occurred. exc_tb (Optional[TracebackType]): Exception traceback if an error occurred. """ await self.cleanup()
[docs] async def extract(self, response: str) -> Optional[str]: r"""Extracts a normalized, comparable part of the LLM response using the fixed multi-stage strategy pipeline. Args: response (str): The raw response text. Returns: Optional[str]: Extracted data if successful, otherwise None. Raises: ValueError: If response is empty or invalid. RuntimeError: If extractor is not initialized. """ if not self._is_setup: raise RuntimeError( "Extractor must be initialized before extraction" ) if not response or not response.strip(): raise ValueError("Empty or whitespace-only response") current_input = response # Initial input for stage in self._pipeline: stage_success = ( False # Track if any strategy in the stage succeeds ) for strategy in stage: try: # Apply the extraction timeout result = await asyncio.wait_for( strategy.extract(current_input), timeout=self._metadata["extraction_timeout"], ) if result is not None: current_input = result # Feed into next stage stage_success = True break # Move to next stage if valid extraction occurs except asyncio.TimeoutError: logger.warning( f"Strategy {strategy.__class__.__name__} timed out " f"after {self._metadata['extraction_timeout']} seconds" ) except Exception as e: logger.warning( f"Strategy {strategy.__class__.__name__} failed: {e}" ) if not stage_success: logger.debug( "No strategy in stage succeeded, stopping extraction." ) return None # Stop processing if the stage fails return current_input # Final processed output