"""Python Valkey Dict module."""
from typing import Any, Iterator, Tuple, Union, Optional, List, Dict
import time
from datetime import timedelta
from redis import StrictRedis
from .core import ValkeyDict
[docs]
class PythonValkeyDict(ValkeyDict):
"""Python dictionary with Valkey as backend.
With support for advanced features, such as custom data types, pipelining, and key expiration.
This class focuses on having one-to-on behavior of a dictionary while using Valkey as storage layer, allowing
for efficient storage and retrieval of key-value pairs. It supports various data types, including
strings, integers, floats, lists, dictionaries, tuples, sets, and user-defined types. The class
leverages the power of Valkey pipelining to minimize network round-trip time, latency, and I/O load,
thereby optimizing performance for batch operations. Additionally, it allows for the management of
key expiration through the use of context managers.
The ValkeyDict class is designed to be analogous to a standard Python dictionary while providing
enhanced functionality, such as support for a wider range of data types and efficient batch operations.
It aims to offer a seamless and familiar interface for developers familiar with Python dictionaries,
enabling a smooth transition to a Valkey-backed data store.
Extendable Types: You can extend ValkeyDict by adding or overriding encoding and decoding functions.
This functionality enables various use cases, such as managing encrypted data in Valkey,
To implement this, simply create and register your custom encoding and decoding functions.
By delegating serialization to valkey-dict, reduce complexity and have simple code in the codebase.
"""
[docs]
def __init__(self,
namespace: str = 'main',
expire: Union[int, timedelta, None] = None,
preserve_expiration: Optional[bool] = False,
valkey: "Optional[StrictRedis[Any]]" = None,
**valkey_kwargs: Any) -> None: # noqa: D202 pydocstyle clashes with Sphinx
"""
Initialize a ValkeyDict instance.
Init the ValkeyDict instance.
Args:
namespace (str): A prefix for keys stored in Valkey.
expire (Union[int, timedelta, None], optional): Expiration time for keys.
preserve_expiration (Optional[bool], optional): Preserve expiration on key updates.
valkey (Optional[StrictValkey[Any]], optional): A Valkey connection instance.
**valkey_kwargs (Any): Additional kwargs for Valkey connection if not provided.
"""
super().__init__(
namespace=namespace,
expire=expire,
preserve_expiration=preserve_expiration,
valkey=valkey,
raise_key_error_delete=True,
**valkey_kwargs
)
self._insertion_order_key = f"valkey-dict-insertion-order-{namespace}"
def __delitem__(self, key: str) -> None:
"""
Delete the value associated with the given key, analogous to a dictionary.
For distributed systems, we intentionally don't raise KeyError when the key doesn't exist.
This ensures identical code running across different systems won't randomly fail
when another system already achieved the deletion goal (key not existing).
Warning:
Setting dict_compliant=True will raise KeyError when key doesn't exist.
This is not recommended for distributed systems as it can cause KeyErrors
that are hard to debug when multiple systems interact with the same keys.
Args:
key (str): The key to delete
Raises:
KeyError: Only if dict_compliant=True and key doesn't exist
"""
formatted_key = self._format_key(key)
result = self.valkey.delete(formatted_key)
self._insertion_order_delete(formatted_key)
if not result:
raise KeyError(key)
def _store(self, key: str, value: Any) -> None:
"""
Store a value in Valkey with the given key.
Args:
key (str): The key to store the value.
value (Any): The value to be stored.
Raises:
ValueError: If the value or key fail validation.
Note: Validity checks could be refactored to allow for custom exceptions that inherit from ValueError,
providing detailed information about why a specific validation failed.
This would enable users to specify which validity checks should be executed, add custom validity functions,
and choose whether to fail on validation errors, or drop the data and only issue a warning and continue.
Example use case is caching, to cache data only when it's between min and max sizes.
Allowing for simple dict set operation, but only cache data that makes sense.
"""
if not self._valid_input(value) or not self._valid_input(key):
raise ValueError("Invalid input value or key size exceeded the maximum limit.")
formatted_key = self._format_key(key)
formatted_value = self._format_value(value)
with self.pipeline():
self._insertion_order_add(formatted_key)
self._store_set(formatted_key, formatted_value)
[docs]
def setdefault(self, key: str, default_value: Optional[Any] = None) -> Any:
"""Get value under key, and if not present set default value.
Return the value associated with the given key if it exists, otherwise set the value to the
default value and return it. Analogous to a dictionary's setdefault method.
Args:
key (str): The key to retrieve the value.
default_value (Optional[Any], optional): The value to set if the key is not found.
Returns:
Any: The value associated with the key or the default value.
"""
formatted_key = self._format_key(key)
formatted_value = self._format_value(default_value)
# Todo bind both commands
args, options = self._create_set_get_command(formatted_key, formatted_value)
result = self.get_valkey.execute_command(*args, **options)
self._insertion_order_add(formatted_key)
if result is None:
return default_value
return self._transform(result)
def __len__(self) -> int:
"""
Get the number of items in the ValkeyDict, analogous to a dictionary.
Returns:
int: The number of items in the ValkeyDict.
"""
return self._insertion_order_len()
def _scan_keys(self, search_term: str = '', full_scan: bool = False) -> Iterator[str]:
return self._insertion_order_iter()
[docs]
def clear(self) -> None:
"""Remove all key-value pairs from the ValkeyDict in one batch operation using pipelining.
This method mimics the behavior of the `clear` method from a standard Python dictionary.
Valkey pipelining is employed to group multiple commands into a single request, minimizing
network round-trip time, latency, and I/O load, thereby enhancing the overall performance.
"""
with self.pipeline():
self._insertion_order_clear()
for key in self._scan_keys(full_scan=True):
self.valkey.delete(key)
[docs]
def popitem(self) -> Tuple[str, Any]:
"""Remove and return a random (key, value) pair from the ValkeyDict as a tuple.
This method is analogous to the `popitem` method of a standard Python dictionary.
if dict_compliant set true stays true to In Python 3.7+, removes the last inserted item (LIFO order)
Returns:
tuple: A tuple containing a randomly chosen (key, value) pair.
Raises:
KeyError: If ValkeyDict is empty.
"""
key = self._insertion_order_latest()
if key is None:
raise KeyError("popitem(): dictionary is empty")
return self._parse_key(key), self._transform(self._pop(key))
def _pop(self, formatted_key: str) -> Any:
"""
Remove the value associated with the given key and return it.
Or return the default value if the key is not found.
Args:
formatted_key (str): The formatted key to remove the value.
Returns:
Any: The value associated with the key or the default value.
"""
# TODO bind both commands
self._insertion_order_delete(formatted_key)
return self.get_valkey.execute_command("GETDEL", formatted_key)
[docs]
def multi_get(self, _key: str) -> List[Any]:
"""
Not part of Python Valkey Dict.
Args:
_key (str): Not used.
Raises:
NotImplementedError: Not part of Python Valkey Dict.
"""
raise NotImplementedError("Not part of PythonValkeyDict")
[docs]
def multi_chain_get(self, _keys: List[str]) -> List[Any]:
"""
Not part of Python Valkey Dict.
Args:
_keys (List[str]): Not used.
Raises:
NotImplementedError: Not part of Python Valkey Dict.
"""
raise NotImplementedError("Not part of PythonValkeyDict")
[docs]
def multi_dict(self, _key: str) -> Dict[str, Any]:
"""
Not part of Python Valkey Dict.
Args:
_key (str): Not used.
Raises:
NotImplementedError: Not part of Python Valkey Dict.
"""
raise NotImplementedError("Not part of PythonValkeyDict")
[docs]
def multi_del(self, _key: str) -> int:
"""
Not part of Python Valkey Dict.
Args:
_key (str): Not used.
Raises:
NotImplementedError: Not part of Python Valkey Dict.
"""
raise NotImplementedError("Not part of PythonValkeyDict")
def _insertion_order_add(self, formatted_key: str) -> bool:
"""Record a key's insertion into the dictionary.
This private method updates the insertion order tracking when a new key is added
to the dictionary.
Args:
formatted_key (str): The key being added to the dictionary.
Returns:
bool: True if the insertion order was updated, False otherwise.
"""
return bool(self.valkey.zadd(self._insertion_order_key, {formatted_key: time.time()}))
def _insertion_order_delete(self, formatted_key: str) -> bool:
"""Remove a key from the insertion order tracking.
This private method updates the insertion order tracking when a key is removed
from the dictionary.
Args:
formatted_key (str): The key being removed from the dictionary.
Returns:
bool: True if the insertion order was updated, False otherwise.
"""
return bool(self.valkey.zrem(self._insertion_order_key, formatted_key))
def _insertion_order_iter(self) -> Iterator[str]:
"""Create an iterator for dictionary keys in their insertion order.
This private method allows for iterating over the dictionary's keys in the order
they were inserted.
Yields:
str: Keys in their insertion order.
"""
# TODO add full_scan boolean and search terms.
first = True
cursor = -1
while cursor != 0:
if first:
cursor = 0
first = False
cursor, data = self.get_valkey.zscan(
name=self._insertion_order_key,
cursor=cursor,
count=1
)
yield from (item[0] for item in data)
def _insertion_order_clear(self) -> bool:
"""Clear all insertion order information.
This private method resets the insertion order tracking for the dictionary.
Returns:
bool: True if the insertion order was successfully cleared, False otherwise.
"""
return bool(self.valkey.delete(self._insertion_order_key))
def _insertion_order_len(self) -> int:
"""Get the number of keys in the insertion order tracking.
This private method returns the count of keys being tracked for insertion order.
Returns:
int: The number of keys in the insertion order tracking.
"""
return self.get_valkey.zcard(self._insertion_order_key)
def _insertion_order_latest(self) -> Union[str, None]:
"""Get the most recently inserted key in the dictionary.
This private method retrieves the key that was most recently added to the dictionary.
Returns:
Union[str, None]: The most recently inserted key, or None if the dictionary is empty.
"""
result = self.valkey.zrange(self._insertion_order_key, -1, -1)
return result[0] if result else None