Source code for camel.storages.key_value_storages.redis
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
import asyncio
import json
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from camel.storages.key_value_storages import BaseKeyValueStorage
if TYPE_CHECKING:
from redis.asyncio import Redis
logger = logging.getLogger(__name__)
[docs]
class RedisStorage(BaseKeyValueStorage):
r"""A concrete implementation of the :obj:`BaseCacheStorage` using Redis as
the backend. This is suitable for distributed cache systems that require
persistence and high availability.
"""
def __init__(
self,
sid: str,
url: str = "redis://localhost:6379",
loop: Optional[asyncio.AbstractEventLoop] = None,
**kwargs,
) -> None:
r"""Initializes the RedisStorage instance with the provided URL and
options.
Args:
sid (str): The ID for the storage instance to identify the
record space.
url (str): The URL for connecting to the Redis server.
**kwargs: Additional keyword arguments for Redis client
configuration.
Raises:
ImportError: If the `redis.asyncio` module is not installed.
"""
try:
import redis.asyncio as aredis
except ImportError as exc:
logger.error(
"Please install `redis` first. You can install it by "
"running `pip install redis`."
)
raise exc
self._client: Optional[aredis.Redis] = None
self._url = url
self._sid = sid
self._loop = loop or asyncio.get_event_loop()
self._create_client(**kwargs)
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
self._run_async(self.close())
[docs]
async def close(self) -> None:
r"""Closes the Redis client asynchronously."""
if self._client:
await self._client.close()
def _create_client(self, **kwargs) -> None:
r"""Creates the Redis client with the provided URL and options.
Args:
**kwargs: Additional keyword arguments for Redis client
configuration.
"""
import redis.asyncio as aredis
self._client = aredis.from_url(self._url, **kwargs)
@property
def client(self) -> Optional["Redis"]:
r"""Returns the Redis client instance.
Returns:
redis.asyncio.Redis: The Redis client instance.
"""
return self._client
[docs]
def save(
self, records: List[Dict[str, Any]], expire: Optional[int] = None
) -> None:
r"""Saves a batch of records to the key-value storage system."""
try:
self._run_async(self._async_save(records, expire))
except Exception as e:
logger.error(f"Error in save: {e}")
[docs]
def load(self) -> List[Dict[str, Any]]:
r"""Loads all stored records from the key-value storage system.
Returns:
List[Dict[str, Any]]: A list of dictionaries, where each dictionary
represents a stored record.
"""
try:
return self._run_async(self._async_load())
except Exception as e:
logger.error(f"Error in load: {e}")
return []
[docs]
def clear(self) -> None:
r"""Removes all records from the key-value storage system."""
try:
self._run_async(self._async_clear())
except Exception as e:
logger.error(f"Error in clear: {e}")
async def _async_save(
self, records: List[Dict[str, Any]], expire: Optional[int] = None
) -> None:
if self._client is None:
raise ValueError("Redis client is not initialized")
try:
value = json.dumps(records)
if expire:
await self._client.setex(self._sid, expire, value)
else:
await self._client.set(self._sid, value)
except Exception as e:
logger.error(f"Error saving records: {e}")
async def _async_load(self) -> List[Dict[str, Any]]:
if self._client is None:
raise ValueError("Redis client is not initialized")
try:
value = await self._client.get(self._sid)
if value:
return json.loads(value)
return []
except Exception as e:
logger.error(f"Error loading records: {e}")
return []
async def _async_clear(self) -> None:
if self._client is None:
raise ValueError("Redis client is not initialized")
try:
await self._client.delete(self._sid)
except Exception as e:
logger.error(f"Error clearing records: {e}")
def _run_async(self, coro):
if not self._loop.is_running():
return self._loop.run_until_complete(coro)
else:
future = asyncio.run_coroutine_threadsafe(coro, self._loop)
return future.result()