import asyncio
import itertools
import typing
from hazelcast.aggregator import Aggregator
from hazelcast.config import IndexUtil, IndexType, IndexConfig
from hazelcast.core import SimpleEntryView
from hazelcast.projection import Projection
from hazelcast.protocol import PagingPredicateHolder
from hazelcast.protocol.codec import (
map_add_entry_listener_codec,
map_add_entry_listener_to_key_codec,
map_add_entry_listener_with_predicate_codec,
map_add_entry_listener_to_key_with_predicate_codec,
map_clear_codec,
map_contains_key_codec,
map_contains_value_codec,
map_delete_codec,
map_entry_set_codec,
map_entries_with_predicate_codec,
map_evict_codec,
map_evict_all_codec,
map_flush_codec,
map_get_codec,
map_get_all_codec,
map_get_entry_view_codec,
map_is_empty_codec,
map_key_set_codec,
map_key_set_with_predicate_codec,
map_load_all_codec,
map_load_given_keys_codec,
map_put_codec,
map_put_all_codec,
map_put_if_absent_codec,
map_put_transient_codec,
map_size_codec,
map_remove_codec,
map_remove_if_same_codec,
map_remove_entry_listener_codec,
map_replace_codec,
map_replace_if_same_codec,
map_set_codec,
map_try_put_codec,
map_try_remove_codec,
map_values_codec,
map_values_with_predicate_codec,
map_add_interceptor_codec,
map_aggregate_codec,
map_aggregate_with_predicate_codec,
map_project_codec,
map_project_with_predicate_codec,
map_execute_on_all_keys_codec,
map_execute_on_key_codec,
map_execute_on_keys_codec,
map_execute_with_predicate_codec,
map_add_index_codec,
map_set_ttl_codec,
map_entries_with_paging_predicate_codec,
map_key_set_with_paging_predicate_codec,
map_values_with_paging_predicate_codec,
map_put_with_max_idle_codec,
map_put_if_absent_with_max_idle_codec,
map_put_transient_with_max_idle_codec,
map_set_with_max_idle_codec,
map_remove_interceptor_codec,
map_remove_all_codec,
map_add_near_cache_invalidation_listener_codec,
)
from hazelcast.internal.asyncio_proxy.base import (
Proxy,
EntryEvent,
EntryEventType,
get_entry_listener_flags,
)
from hazelcast.predicate import Predicate, _PagingPredicate
from hazelcast.serialization.data import Data
from hazelcast.types import AggregatorResultType, KeyType, ValueType, ProjectionType
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import (
check_not_none,
thread_id,
to_millis,
IterationType,
deserialize_entry_list_in_place,
deserialize_list_in_place,
)
EntryEventCallable = typing.Callable[[EntryEvent[KeyType, ValueType]], None]
[docs]
class Map(Proxy, typing.Generic[KeyType, ValueType]):
"""Hazelcast Map client proxy to access the map on the cluster.
Concurrent, distributed, observable and queryable map.
Example:
>>> my_map = await client.get_map("my_map")
>>> print("map.put", await my_map.put("key", "value"))
>>> print("map.contains_key", await my_map.contains_key("key"))
>>> print("map.get", await my_map.get("key"))
>>> print("map.size", await my_map.size())
This class does not allow ``None`` to be used as a key or value.
Warning:
Asyncio client map proxy is not thread-safe, do not access it from other threads.
"""
def __init__(self, service_name, name, context):
super(Map, self).__init__(service_name, name, context)
self._reference_id_generator = context.lock_reference_id_generator
[docs]
async def add_entry_listener(
self,
include_value: bool = False,
key: KeyType = None,
predicate: Predicate = None,
added_func: EntryEventCallable = None,
removed_func: EntryEventCallable = None,
updated_func: EntryEventCallable = None,
evicted_func: EntryEventCallable = None,
evict_all_func: EntryEventCallable = None,
clear_all_func: EntryEventCallable = None,
merged_func: EntryEventCallable = None,
expired_func: EntryEventCallable = None,
loaded_func: EntryEventCallable = None,
) -> str:
"""Adds a continuous entry listener for this map.
Listener will get notified for map events filtered with given
parameters.
The listener functions must not block.
Args:
include_value: Whether received event should include the value or
not.
key: Key for filtering the events.
predicate: Predicate for filtering the events.
added_func: Function to be called when an entry is added to map.
removed_func: Function to be called when an entry is removed from
map.
updated_func: Function to be called when an entry is updated.
evicted_func: Function to be called when an entry is evicted from
map.
evict_all_func: Function to be called when entries are evicted
from map.
clear_all_func: Function to be called when entries are cleared
from map.
merged_func: Function to be called when WAN replicated entry is
merged.
expired_func: Function to be called when an entry's live time is
expired.
loaded_func: Function to be called when an entry is loaded from a
map loader.
Returns:
A registration id which is used as a key to remove the listener.
"""
flags = get_entry_listener_flags(
ADDED=added_func,
REMOVED=removed_func,
UPDATED=updated_func,
EVICTED=evicted_func,
EXPIRED=expired_func,
EVICT_ALL=evict_all_func,
CLEAR_ALL=clear_all_func,
MERGED=merged_func,
LOADED=loaded_func,
)
if key is not None and predicate is not None:
try:
key_data = self._to_data(key)
predicate_data = self._to_data(predicate)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(
e,
self.add_entry_listener,
include_value,
key,
predicate,
added_func,
removed_func,
updated_func,
evicted_func,
evict_all_func,
clear_all_func,
merged_func,
expired_func,
loaded_func,
)
with_key_and_predicate_codec = map_add_entry_listener_to_key_with_predicate_codec
request = with_key_and_predicate_codec.encode_request(
self.name, key_data, predicate_data, include_value, flags, self._is_smart
)
response_decoder = with_key_and_predicate_codec.decode_response
event_message_handler = with_key_and_predicate_codec.handle
elif key is not None and predicate is None:
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(
e,
self.add_entry_listener,
include_value,
key,
predicate,
added_func,
removed_func,
updated_func,
evicted_func,
evict_all_func,
clear_all_func,
merged_func,
expired_func,
loaded_func,
)
with_key_codec = map_add_entry_listener_to_key_codec
request = with_key_codec.encode_request(
self.name, key_data, include_value, flags, self._is_smart
)
response_decoder = with_key_codec.decode_response
event_message_handler = with_key_codec.handle
elif key is None and predicate is not None:
try:
predicate = self._to_data(predicate)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(
e,
self.add_entry_listener,
include_value,
key,
predicate,
added_func,
removed_func,
updated_func,
evicted_func,
evict_all_func,
clear_all_func,
merged_func,
expired_func,
loaded_func,
)
with_predicate_codec = map_add_entry_listener_with_predicate_codec
request = with_predicate_codec.encode_request(
self.name, predicate, include_value, flags, self._is_smart
)
response_decoder = with_predicate_codec.decode_response
event_message_handler = with_predicate_codec.handle
else:
codec = map_add_entry_listener_codec
request = codec.encode_request(self.name, include_value, flags, self._is_smart)
response_decoder = codec.decode_response
event_message_handler = codec.handle
def handle_event_entry(
key_data,
value_data,
old_value_data,
merging_value_data,
event_type,
uuid,
number_of_affected_entries,
):
event = EntryEvent(
self._to_object(key_data),
self._to_object(value_data),
self._to_object(old_value_data),
self._to_object(merging_value_data),
event_type,
uuid,
number_of_affected_entries,
)
if event.event_type == EntryEventType.ADDED:
if added_func:
added_func(event)
elif event.event_type == EntryEventType.REMOVED:
if removed_func:
removed_func(event)
elif event.event_type == EntryEventType.UPDATED:
if updated_func:
updated_func(event)
elif event.event_type == EntryEventType.EVICTED:
if evicted_func:
evicted_func(event)
elif event.event_type == EntryEventType.EVICT_ALL:
if evict_all_func:
evict_all_func(event)
elif event.event_type == EntryEventType.CLEAR_ALL:
if clear_all_func:
clear_all_func(event)
elif event.event_type == EntryEventType.MERGED:
if merged_func:
merged_func(event)
elif event.event_type == EntryEventType.EXPIRED:
if expired_func:
expired_func(event)
elif event.event_type == EntryEventType.LOADED:
if loaded_func:
loaded_func(event)
return await self._register_listener(
request,
lambda r: response_decoder(r),
lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id),
lambda m: event_message_handler(m, handle_event_entry),
)
[docs]
async def add_index(
self,
attributes: typing.Sequence[str] = None,
index_type: typing.Union[int, str] = IndexType.SORTED,
name: str = None,
bitmap_index_options: typing.Dict[str, typing.Any] = None,
) -> None:
"""Adds an index to this map for the specified entries so that queries
can run faster.
Example:
Let's say your map values are Employee objects.
>>> class Employee(IdentifiedDataSerializable):
>>> active = false
>>> age = None
>>> name = None
>>> #other fields
>>>
>>> #methods
If you query your values mostly based on age and active fields,
you should consider indexing these.
>>> employees = await client.get_map("employees")
>>> await employees.add_index(attributes=["age"]) # Sorted index for range queries
>>> await employees.add_index(attributes=["active"], index_type=IndexType.HASH)) # Hash index for equality predicates
Index attribute should either have a getter method or be public.
You should also make sure to add the indexes before adding
entries to this map.
Indexing time is executed in parallel on each partition by operation
threads. The Map is not blocked during this operation. The time taken
in proportional to the size of the Map and the number Members.
Until the index finishes being created, any searches for the attribute
will use a full Map scan, thus avoiding using a partially built index
and returning incorrect results.
Args:
attributes: List of indexed attributes.
index_type: Type of the index. By default, set to ``SORTED``.
name: Name of the index.
bitmap_index_options: Bitmap index options.
- **unique_key:** (str): The unique key attribute is used as a
source of values which uniquely identify each entry being
inserted into an index. Defaults to ``KEY_ATTRIBUTE_NAME``.
See the :class:`hazelcast.config.QueryConstants` for
possible values.
- **unique_key_transformation** (int|str): The transformation
is applied to every value extracted from the unique key
attribue. Defaults to ``OBJECT``. See the
:class:`hazelcast.config.UniqueKeyTransformation` for
possible values.
"""
d = {
"name": name,
"type": index_type,
"attributes": attributes,
"bitmap_index_options": bitmap_index_options,
}
config = IndexConfig.from_dict(d)
validated = IndexUtil.validate_and_normalize(self.name, config)
request = map_add_index_codec.encode_request(self.name, validated)
return await self._invoke(request)
[docs]
async def add_interceptor(self, interceptor: typing.Any) -> str:
"""Adds an interceptor for this map.
Added interceptor will intercept operations and execute user defined
methods.
Args:
interceptor: Interceptor for the map which includes user defined
methods.
Returns:
Id of registered interceptor.
"""
try:
interceptor_data = self._to_data(interceptor)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.add_interceptor, interceptor)
request = map_add_interceptor_codec.encode_request(self.name, interceptor_data)
return await self._invoke(request, map_add_interceptor_codec.decode_response)
[docs]
async def aggregate(
self, aggregator: Aggregator[AggregatorResultType], predicate: Predicate = None
) -> AggregatorResultType:
"""Applies the aggregation logic on map entries and filter the result
with the predicate, if given.
Args:
aggregator: Aggregator to aggregate the entries with.
predicate: Predicate to filter the entries with.
Returns:
The result of the aggregation.
"""
check_not_none(aggregator, "aggregator can't be none")
if predicate:
if isinstance(predicate, _PagingPredicate):
raise AssertionError("Paging predicate is not supported.")
try:
aggregator_data = self._to_data(aggregator)
predicate_data = self._to_data(predicate)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.aggregate, aggregator, predicate)
def handler(message):
return self._to_object(map_aggregate_with_predicate_codec.decode_response(message))
request = map_aggregate_with_predicate_codec.encode_request(
self.name, aggregator_data, predicate_data
)
else:
try:
aggregator_data = self._to_data(aggregator)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.aggregate, aggregator, predicate)
def handler(message):
return self._to_object(map_aggregate_codec.decode_response(message))
request = map_aggregate_codec.encode_request(self.name, aggregator_data)
return await self._invoke(request, handler)
[docs]
async def clear(self) -> None:
"""Clears the map.
The ``MAP_CLEARED`` event is fired for any registered listeners.
"""
request = map_clear_codec.encode_request(self.name)
return await self._invoke(request)
[docs]
async def contains_key(self, key: KeyType) -> bool:
"""Determines whether this map contains an entry with the key.
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
key: The specified key.
Returns:
``True`` if this map contains an entry for the specified key,
``False`` otherwise.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.contains_key, key)
return await self._contains_key_internal(key_data)
[docs]
async def contains_value(self, value: ValueType) -> bool:
"""Determines whether this map contains one or more keys for the
specified value.
Args:
value: The specified value.
Returns:
``True`` if this map contains an entry for the specified value,
``False`` otherwise.
"""
check_not_none(value, "value can't be None")
try:
value_data = self._to_data(value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.contains_value, value)
request = map_contains_value_codec.encode_request(self.name, value_data)
return await self._invoke(request, map_contains_value_codec.decode_response)
[docs]
async def delete(self, key: KeyType) -> None:
"""Removes the mapping for a key from this map if it is present
(optional operation).
Unlike remove(object), this operation does not return the removed
value, which avoids the serialization cost of the returned value.
If the removed value will not be used, a delete operation is preferred
over a remove operation for better performance.
The map will not contain a mapping for the specified key once the call
returns.
Warning:
This method breaks the contract of EntryListener.
When an entry is removed by delete(), it fires an ``EntryEvent``
with a ``None`` ``old_value``. Also, a listener with predicates
will have ``None`` values, so only the keys can be queried
via predicates.
Args:
key: Key of the mapping to be deleted.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.delete, key)
return await self._delete_internal(key_data)
[docs]
async def entry_set(
self, predicate: Predicate = None
) -> typing.List[typing.Tuple[KeyType, ValueType]]:
"""Returns a list clone of the mappings contained in this map.
Warning:
The list is NOT backed by the map, so changes to the map are NOT
reflected in the list, and vice-versa.
Args:
predicate: Predicate for the map to filter entries.
Returns:
The list of key-value tuples in the map.
"""
if predicate:
if isinstance(predicate, _PagingPredicate):
predicate.iteration_type = IterationType.ENTRY
try:
holder = PagingPredicateHolder.of(predicate, self._to_data)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.entry_set, predicate)
def handler(message):
response = map_entries_with_paging_predicate_codec.decode_response(message)
predicate.anchor_list = response["anchor_data_list"].as_anchor_list(
self._to_object
)
entry_data_list = response["response"]
return deserialize_entry_list_in_place(entry_data_list, self._to_object)
request = map_entries_with_paging_predicate_codec.encode_request(self.name, holder)
else:
try:
predicate_data = self._to_data(predicate)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.entry_set, predicate)
def handler(message):
entry_data_list = map_entries_with_predicate_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)
request = map_entries_with_predicate_codec.encode_request(self.name, predicate_data)
else:
def handler(message):
entry_data_list = map_entry_set_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)
request = map_entry_set_codec.encode_request(self.name)
return await self._invoke(request, handler)
[docs]
async def evict(self, key: KeyType) -> bool:
"""Evicts the specified key from this map.
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
key: Key to evict.
Returns:
``True`` if the key is evicted, ``False`` otherwise.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.evict, key)
return await self._evict_internal(key_data)
[docs]
async def evict_all(self) -> None:
"""Evicts all keys from this map except the locked ones.
The ``EVICT_ALL`` event is fired for any registered listeners.
"""
request = map_evict_all_codec.encode_request(self.name)
return await self._invoke(request)
[docs]
async def execute_on_entries(
self, entry_processor: typing.Any, predicate: Predicate | None = None
) -> typing.List[typing.Any]:
"""Applies the user defined EntryProcessor to all the entries in the
map or entries in the map which satisfies the predicate if provided.
Returns the results mapped by each key in the map.
Args:
entry_processor: A stateful serializable object which represents
the EntryProcessor defined on server side. This object must
have a serializable EntryProcessor counter part registered
on server side with the actual
``com.hazelcast.map.EntryProcessor`` implementation.
predicate: Predicate for filtering the entries.
Returns:
List of map entries which includes the keys and the results of the
entry process.
"""
if predicate:
try:
entry_processor_data = self._to_data(entry_processor)
predicate_data = self._to_data(predicate)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(
e, self.execute_on_entries, entry_processor, predicate
)
def handler(message):
entry_data_list = map_execute_with_predicate_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)
request = map_execute_with_predicate_codec.encode_request(
self.name, entry_processor_data, predicate_data
)
else:
try:
entry_processor_data = self._to_data(entry_processor)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(
e, self.execute_on_entries, entry_processor, predicate
)
def handler(message):
entry_data_list = map_execute_on_all_keys_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)
request = map_execute_on_all_keys_codec.encode_request(self.name, entry_processor_data)
return await self._invoke(request, handler)
[docs]
async def execute_on_key(self, key: KeyType, entry_processor: typing.Any) -> typing.Any:
"""Applies the user defined EntryProcessor to the entry mapped by the
key. Returns the object which is the result of EntryProcessor's
process method.
Args:
key: Specified key for the entry to be processed.
entry_processor: A stateful serializable object which represents
the EntryProcessor defined on server side. This object must
have a serializable EntryProcessor counter part registered on
server side with the actual
``com.hazelcast.map.EntryProcessor`` implementation.
Returns:
Result of entry process.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
entry_processor_data = self._to_data(entry_processor)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.execute_on_key, key, entry_processor)
return await self._execute_on_key_internal(key_data, entry_processor_data)
[docs]
async def execute_on_keys(
self, keys: typing.Sequence[KeyType], entry_processor: typing.Any
) -> typing.List[typing.Any]:
"""Applies the user defined EntryProcessor to the entries mapped by the
collection of keys. Returns the results mapped by each key in the
collection.
Args:
keys: Collection of the keys for the entries to be processed.
entry_processor: A stateful serializable object which represents
the EntryProcessor defined on server side. This object must
have a serializable EntryProcessor counter part registered on
server side with the actual
``com.hazelcast.map.EntryProcessor`` implementation.
Returns:
List of map entries which includes the keys and the results of the
entry process.
"""
if len(keys) == 0:
return []
try:
key_list = []
for key in keys:
check_not_none(key, "key can't be None")
key_list.append(self._to_data(key))
entry_processor_data = self._to_data(entry_processor)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.execute_on_keys, keys, entry_processor)
def handler(message):
entry_data_list = map_execute_on_keys_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)
request = map_execute_on_keys_codec.encode_request(
self.name, entry_processor_data, key_list
)
return await self._invoke(request, handler)
[docs]
async def flush(self) -> None:
"""Flushes all the local dirty entries."""
request = map_flush_codec.encode_request(self.name)
return await self._invoke(request)
[docs]
async def get(self, key: KeyType) -> typing.Optional[ValueType]:
"""Returns the value for the specified key, or ``None`` if this map
does not contain this key.
Warning:
This method returns a clone of original value, modifying the
returned value does not change the actual value in the map. One
should put modified value back to make changes visible to all nodes.
>>> value = await my_map.get(key)
>>> value.update_some_property()
>>> await my_map.put(key,value)
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
key: The specified key.
Returns:
The value for the specified key.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.get, key)
return await self._get_internal(key_data)
[docs]
async def get_all(self, keys: typing.Sequence[KeyType]) -> typing.Dict[KeyType, ValueType]:
"""Returns the entries for the given keys.
Warning:
The returned map is NOT backed by the original map, so changes to
the original map are NOT reflected in the returned map, and
vice-versa.
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
keys: Keys to get.
Returns:
Dictionary of map entries.
"""
check_not_none(keys, "keys can't be None")
if not keys:
return {}
partition_service = self._context.partition_service
partition_to_keys: typing.Dict[int, typing.Dict[KeyType, Data]] = {}
for key in keys:
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.get_all, keys)
partition_id = partition_service.get_partition_id(key_data)
try:
partition_to_keys[partition_id][key] = key_data
except KeyError:
partition_to_keys[partition_id] = {key: key_data}
return await self._get_all_internal(partition_to_keys)
[docs]
async def get_entry_view(self, key: KeyType) -> SimpleEntryView[KeyType, ValueType]:
"""Returns the EntryView for the specified key.
Warning:
This method returns a clone of original mapping, modifying the
returned value does not change the actual value in the map. One
should put modified value back to make changes visible to all
nodes.
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
key: The key of the entry.
Returns:
EntryView of the specified key.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.get_entry_view, key)
def handler(message):
response = map_get_entry_view_codec.decode_response(message)
entry_view = response["response"]
if not entry_view:
return None
entry_view.key = self._to_object(entry_view.key)
entry_view.value = self._to_object(entry_view.value)
return entry_view
request = map_get_entry_view_codec.encode_request(self.name, key_data, thread_id())
return await self._invoke_on_key(request, key_data, handler)
[docs]
async def is_empty(self) -> bool:
"""Returns whether this map contains no key-value mappings or not.
Returns:
``True`` if this map contains no key-value mappings, ``False``
otherwise.
"""
request = map_is_empty_codec.encode_request(self.name)
return await self._invoke(request, map_is_empty_codec.decode_response)
[docs]
async def key_set(self, predicate: Predicate | None = None) -> typing.List[ValueType]:
"""Returns a List clone of the keys contained in this map or the keys
of the entries filtered with the predicate if provided.
Warning:
The list is NOT backed by the map, so changes to the map are NOT
reflected in the list, and vice-versa.
Args:
predicate: Predicate to filter the entries.
Returns:
A list of the clone of the keys.
"""
if predicate:
if isinstance(predicate, _PagingPredicate):
predicate.iteration_type = IterationType.KEY
try:
holder = PagingPredicateHolder.of(predicate, self._to_data)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.key_set, predicate)
def handler(message):
response = map_key_set_with_paging_predicate_codec.decode_response(message)
predicate.anchor_list = response["anchor_data_list"].as_anchor_list(
self._to_object
)
data_list = response["response"]
return deserialize_list_in_place(data_list, self._to_object)
request = map_key_set_with_paging_predicate_codec.encode_request(self.name, holder)
else:
try:
predicate_data = self._to_data(predicate)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.key_set, predicate)
def handler(message):
data_list = map_key_set_with_predicate_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)
request = map_key_set_with_predicate_codec.encode_request(self.name, predicate_data)
else:
def handler(message):
data_list = map_key_set_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)
request = map_key_set_codec.encode_request(self.name)
return await self._invoke(request, handler)
[docs]
async def load_all(
self, keys: typing.Sequence[KeyType] = None, replace_existing_values: bool = True
) -> None:
"""Loads all keys from the store at server side or loads the given
keys if provided.
Args:
keys: Keys of the entry values to load.
replace_existing_values: Whether the existing values will be
replaced or not with those loaded from the server side
MapLoader.
"""
if keys:
try:
key_data_list = [self._to_data(key) for key in keys]
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(
e, self.load_all, keys, replace_existing_values
)
return await self._load_all_internal(key_data_list, replace_existing_values)
request = map_load_all_codec.encode_request(self.name, replace_existing_values)
return await self._invoke(request)
[docs]
async def project(
self, projection: Projection[ProjectionType], predicate: Predicate = None
) -> ProjectionType:
"""Applies the projection logic on map entries and filter the result
with the predicate, if given.
Args:
projection: Projection to project the entries with.
predicate: Predicate to filter the entries with.
Returns:
The result of the projection.
"""
check_not_none(projection, "Projection can't be none")
if predicate:
if isinstance(predicate, _PagingPredicate):
raise AssertionError("Paging predicate is not supported.")
try:
projection_data = self._to_data(projection)
predicate_data = self._to_data(predicate)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.project, projection, predicate)
def handler(message):
data_list = map_project_with_predicate_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)
request = map_project_with_predicate_codec.encode_request(
self.name, projection_data, predicate_data
)
else:
try:
projection_data = self._to_data(projection)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.project, projection, predicate)
def handler(message):
data_list = map_project_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)
request = map_project_codec.encode_request(self.name, projection_data)
return await self._invoke(request, handler)
[docs]
async def put(
self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None
) -> typing.Optional[ValueType]:
"""Associates the specified value with the specified key in this map.
If the map previously contained a mapping for the key, the old value is
replaced by the specified value. If ttl is provided, entry will expire
and get evicted after the ttl.
Warning:
This method returns a clone of the previous value, not the original
(identically equal) value previously put into the map.
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
key: The specified key.
value: The value to associate with the key.
ttl: Maximum time in seconds for this entry to stay in the map. If
not provided, the value configured on the server side
configuration will be used. Setting this to ``0`` means
infinite time-to-live.
max_idle: Maximum time in seconds for this entry to stay idle in
the map. If not provided, the value configured on the server
side configuration will be used. Setting this to ``0`` means
infinite max idle time.
Returns:
Previous value associated with key or ``None`` if there was no
mapping for key.
"""
check_not_none(key, "key can't be None")
check_not_none(value, "value can't be None")
try:
key_data = self._to_data(key)
value_data = self._to_data(value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.put, key, value, ttl, max_idle)
return await self._put_internal(key_data, value_data, ttl, max_idle)
[docs]
async def put_all(self, map: typing.Dict[KeyType, ValueType]) -> None:
"""Copies all the mappings from the specified map to this map.
No atomicity guarantees are given. In the case of a failure, some
key-value tuples may get written, while others are not.
Args:
map: Dictionary which includes mappings to be stored in this map.
"""
check_not_none(map, "map can't be None")
if not map:
return None
partition_service = self._context.partition_service
partition_map: typing.Dict[int, typing.List[typing.Tuple[Data, Data]]] = {}
for key, value in map.items():
check_not_none(key, "key can't be None")
check_not_none(value, "value can't be None")
try:
entry = (self._to_data(key), self._to_data(value))
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.put_all, map)
partition_id = partition_service.get_partition_id(entry[0])
try:
partition_map[partition_id].append(entry)
except KeyError:
partition_map[partition_id] = [entry]
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
for partition_id, entry_list in partition_map.items():
request = map_put_all_codec.encode_request(
self.name, entry_list, False
) # TODO trigger map loader
tg.create_task(self._ainvoke_on_partition(request, partition_id))
return None
[docs]
async def put_if_absent(
self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None
) -> typing.Optional[ValueType]:
"""Associates the specified key with the given value if it is not
already associated.
If ttl is provided, entry will expire and get evicted after the ttl.
This is equivalent to below, except that the action is performed
atomically:
>>> if not (await my_map.contains_key(key)):
>>> return await my_map.put(key,value)
>>> else:
>>> return await my_map.get(key)
Warning:
This method returns a clone of the previous value, not the original
(identically equal) value previously put into the map.
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
key: Key of the entry.
value: Value of the entry.
ttl: Maximum time in seconds for this entry to stay in the map. If
not provided, the value configured on the server side
configuration will be used. Setting this to ``0`` means
infinite time-to-live.
max_idle: Maximum time in seconds for this entry to stay idle in
the map. If not provided, the value configured on the server
side configuration will be used. Setting this to ``0`` means
infinite max idle time.
Returns:
Old value of the entry.
"""
check_not_none(key, "key can't be None")
check_not_none(value, "value can't be None")
try:
key_data = self._to_data(key)
value_data = self._to_data(value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(
e, self.put_if_absent, key, value, ttl, max_idle
)
return await self._put_if_absent_internal(key_data, value_data, ttl, max_idle)
[docs]
async def put_transient(
self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None
) -> None:
"""Same as ``put``, but MapStore defined at the server side will not
be called.
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
key: Key of the entry.
value: Value of the entry.
ttl: Maximum time in seconds for this entry to stay in the map. If
not provided, the value configured on the server side
configuration will be used. Setting this to ``0`` means
infinite time-to-live.
max_idle: Maximum time in seconds for this entry to stay idle in
the map. If not provided, the value configured on the server
side configuration will be used. Setting this to ``0`` means
infinite max idle time.
"""
check_not_none(key, "key can't be None")
check_not_none(value, "value can't be None")
try:
key_data = self._to_data(key)
value_data = self._to_data(value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(
e, self.put_transient, key, value, ttl, max_idle
)
return await self._put_transient_internal(key_data, value_data, ttl, max_idle)
[docs]
async def remove(self, key: KeyType) -> typing.Optional[ValueType]:
"""Removes the mapping for a key from this map if it is present.
The map will not contain a mapping for the specified key once the call
returns.
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
key: Key of the mapping to be deleted.
Returns:
The previous value associated with key, or ``None`` if there was
no mapping for key.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.remove, key)
return await self._remove_internal(key_data)
[docs]
async def remove_all(self, predicate: Predicate) -> None:
"""Removes all entries which match with the supplied predicate.
Args:
predicate: Used to select entries to be removed from map.
"""
check_not_none(predicate, "predicate can't be None")
try:
predicate_data = self._to_data(predicate)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.remove_all, predicate)
return await self._remove_all_internal(predicate_data)
[docs]
async def remove_if_same(self, key: KeyType, value: ValueType) -> bool:
"""Removes the entry for a key only if it is currently mapped to a
given value.
This is equivalent to below, except that the action is performed
atomically:
>>> if (await my_map.contains_key(key)) and (await my_map.get(key) == value):
>>> await my_map.remove(key)
>>> return True
>>> else:
>>> return False
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
key: The specified key.
value: Remove the key if it has this value.
Returns:
``True`` if the value was removed, ``False`` otherwise.
"""
check_not_none(key, "key can't be None")
check_not_none(value, "value can't be None")
try:
key_data = self._to_data(key)
value_data = self._to_data(value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.remove_if_same, key, value)
return await self._remove_if_same_internal_(key_data, value_data)
[docs]
async def remove_entry_listener(self, registration_id: str) -> bool:
"""Removes the specified entry listener.
Returns silently if there is no such listener added before.
Args:
registration_id: Id of registered listener.
Returns:
``True`` if registration is removed, ``False`` otherwise.
"""
return await self._deregister_listener(registration_id)
[docs]
async def remove_interceptor(self, registration_id: str) -> bool:
"""Removes the given interceptor for this map, so it will not intercept
operations anymore.
Args:
registration_id: Registration ID of the map interceptor.
Returns:
``True`` if the interceptor is removed, ``False`` otherwise.
"""
check_not_none(registration_id, "Interceptor registration id should not be None")
request = map_remove_interceptor_codec.encode_request(self.name, registration_id)
return await self._invoke(request, map_remove_interceptor_codec.decode_response)
[docs]
async def replace(self, key: KeyType, value: ValueType) -> typing.Optional[ValueType]:
"""Replaces the entry for a key only if it is currently mapped to some
value.
This is equivalent to below, except that the action is performed
atomically:
>>> if await my_map.contains_key(key):
>>> return await my_map.put(key,value)
>>> else:
>>> return None
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Warning:
This method returns a clone of the previous value, not the original
(identically equal) value previously put into the map.
Args:
key: The specified key.
value: The value to replace the previous value.
Returns:
Previous value associated with key, or ``None`` if there was no
mapping for key.
"""
check_not_none(key, "key can't be None")
check_not_none(value, "value can't be None")
try:
key_data = self._to_data(key)
value_data = self._to_data(value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.replace, key, value)
return await self._replace_internal(key_data, value_data)
[docs]
async def replace_if_same(
self, key: ValueType, old_value: ValueType, new_value: ValueType
) -> bool:
"""Replaces the entry for a key only if it is currently mapped to a
given value.
This is equivalent to below, except that the action is performed
atomically:
>>> if (await my_map.contains_key(key)) and (await my_map.get(key) == old_value):
>>> await my_map.put(key, new_value)
>>> return True
>>> else:
>>> return False
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
key: The specified key.
old_value: Replace the key value if it is the old value.
new_value: The new value to replace the old value.
Returns:
``True`` if the value was replaced, ``False`` otherwise.
"""
check_not_none(key, "key can't be None")
check_not_none(old_value, "old_value can't be None")
check_not_none(new_value, "new_value can't be None")
try:
key_data = self._to_data(key)
old_value_data = self._to_data(old_value)
new_value_data = self._to_data(new_value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(
e, self.replace_if_same, key, old_value, new_value
)
return await self._replace_if_same_internal(key_data, old_value_data, new_value_data)
[docs]
async def set(
self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None
) -> None:
"""Puts an entry into this map.
Similar to the put operation except that set doesn't return the old
value, which is more efficient. If ttl is provided, entry will expire
and get evicted after the ttl.
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
key: Key of the entry.
value: Value of the entry.
ttl: Maximum time in seconds for this entry to stay in the map. If
not provided, the value configured on the server side
configuration will be used. Setting this to ``0`` means
infinite time-to-live.
max_idle: Maximum time in seconds for this entry to stay idle in
the map. If not provided, the value configured on the server
side configuration will be used. Setting this to ``0`` means
infinite max idle time.
"""
check_not_none(key, "key can't be None")
check_not_none(value, "value can't be None")
try:
key_data = self._to_data(key)
value_data = self._to_data(value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.set, key, value, ttl, max_idle)
return await self._set_internal(key_data, value_data, ttl, max_idle)
[docs]
async def set_ttl(self, key: KeyType, ttl: float) -> None:
"""Updates the TTL (time to live) value of the entry specified by the
given key with a new TTL value.
New TTL value is valid starting from the time this operation is
invoked, not since the time the entry was created. If the entry does
not exist or is already expired, this call has no effect.
Args:
key: The key of the map entry.
ttl: Maximum time in seconds for this entry to stay in the map.
Setting this to ``0`` means infinite time-to-live.
"""
check_not_none(key, "key can't be None")
check_not_none(ttl, "ttl can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.set_ttl, key, ttl)
return await self._set_ttl_internal(key_data, ttl)
[docs]
async def size(self) -> int:
"""Returns the number of entries in this map.
Returns:
Number of entries in this map.
"""
request = map_size_codec.encode_request(self.name)
return await self._invoke(request, map_size_codec.decode_response)
[docs]
async def try_put(self, key: KeyType, value: ValueType, timeout: float = 0) -> bool:
"""Tries to put the given key and value into this map and returns
immediately if timeout is not provided.
If timeout is provided, operation waits until it is completed or
timeout is reached.
Args:
key: Key of the entry.
value: Value of the entry.
timeout: Maximum time in seconds to wait.
Returns:
``True`` if the put is successful, ``False`` otherwise.
"""
check_not_none(key, "key can't be None")
check_not_none(value, "value can't be None")
try:
key_data = self._to_data(key)
value_data = self._to_data(value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.try_put, key, value, timeout)
return await self._try_put_internal(key_data, value_data, timeout)
[docs]
async def try_remove(self, key: KeyType, timeout: float = 0) -> bool:
"""Tries to remove the given key from this map and returns immediately
if timeout is not provided.
If timeout is provided, operation waits until it is completed or
timeout is reached.
Args:
key: Key of the entry to be deleted.
timeout: Maximum time in seconds to wait.
Returns:
``True`` if the remove is successful, ``False`` otherwise.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.try_remove, key, timeout)
return await self._try_remove_internal(key_data, timeout)
[docs]
async def values(self, predicate: Predicate = None) -> typing.List[ValueType]:
"""Returns a list clone of the values contained in this map or values
of the entries which are filtered with the predicate if provided.
Warning:
The list is NOT backed by the map, so changes to the map are NOT
reflected in the list, and vice-versa.
Args:
predicate: Predicate to filter the entries.
Returns:
A list of clone of the values contained in this map.
"""
if predicate:
if isinstance(predicate, _PagingPredicate):
predicate.iteration_type = IterationType.VALUE
try:
holder = PagingPredicateHolder.of(predicate, self._to_data)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.values, predicate)
def handler(message):
response = map_values_with_paging_predicate_codec.decode_response(message)
predicate.anchor_list = response["anchor_data_list"].as_anchor_list(
self._to_object
)
data_list = response["response"]
return deserialize_list_in_place(data_list, self._to_object)
request = map_values_with_paging_predicate_codec.encode_request(self.name, holder)
else:
try:
predicate_data = self._to_data(predicate)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.values, predicate)
def handler(message):
data_list = map_values_with_predicate_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)
request = map_values_with_predicate_codec.encode_request(self.name, predicate_data)
else:
def handler(message):
data_list = map_values_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)
request = map_values_codec.encode_request(self.name)
return await self._invoke(request, handler)
def _contains_key_internal(self, key_data):
request = map_contains_key_codec.encode_request(self.name, key_data, thread_id())
return self._invoke_on_key(request, key_data, map_contains_key_codec.decode_response)
def _get_internal(self, key_data):
def handler(message):
return self._to_object(map_get_codec.decode_response(message))
request = map_get_codec.encode_request(self.name, key_data, thread_id())
return self._invoke_on_key(request, key_data, handler)
async def _get_all_internal(self, partition_to_keys, tasks=None):
def handler(message):
entry_data_list = map_get_all_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)
tasks = tasks or []
async with asyncio.TaskGroup() as tg:
for partition_id, key_dict in partition_to_keys.items():
request = map_get_all_codec.encode_request(self.name, key_dict.values())
task = tg.create_task(self._ainvoke_on_partition(request, partition_id, handler))
tasks.append(task)
kvs = itertools.chain.from_iterable(task.result() for task in tasks)
return dict(kvs)
def _remove_internal(self, key_data):
def handler(message):
return self._to_object(map_remove_codec.decode_response(message))
request = map_remove_codec.encode_request(self.name, key_data, thread_id())
return self._invoke_on_key(request, key_data, handler)
def _remove_all_internal(self, predicate_data):
request = map_remove_all_codec.encode_request(self.name, predicate_data)
return self._invoke(request)
def _remove_if_same_internal_(self, key_data, value_data):
request = map_remove_if_same_codec.encode_request(
self.name, key_data, value_data, thread_id()
)
return self._invoke_on_key(
request, key_data, response_handler=map_remove_if_same_codec.decode_response
)
def _delete_internal(self, key_data):
request = map_delete_codec.encode_request(self.name, key_data, thread_id())
return self._invoke_on_key(request, key_data)
async def _put_internal(self, key_data, value_data, ttl, max_idle):
def handler(message):
return self._to_object(map_put_codec.decode_response(message))
if max_idle is not None:
request = map_put_with_max_idle_codec.encode_request(
self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle)
)
else:
request = map_put_codec.encode_request(
self.name, key_data, value_data, thread_id(), to_millis(ttl)
)
return await self._invoke_on_key(request, key_data, handler)
def _set_internal(self, key_data, value_data, ttl, max_idle):
if max_idle is not None:
request = map_set_with_max_idle_codec.encode_request(
self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle)
)
else:
request = map_set_codec.encode_request(
self.name, key_data, value_data, thread_id(), to_millis(ttl)
)
return self._invoke_on_key(request, key_data)
def _set_ttl_internal(self, key_data, ttl):
request = map_set_ttl_codec.encode_request(self.name, key_data, to_millis(ttl))
return self._invoke_on_key(request, key_data, map_set_ttl_codec.decode_response)
def _try_remove_internal(self, key_data, timeout):
request = map_try_remove_codec.encode_request(
self.name, key_data, thread_id(), to_millis(timeout)
)
return self._invoke_on_key(request, key_data, map_try_remove_codec.decode_response)
def _try_put_internal(self, key_data, value_data, timeout):
request = map_try_put_codec.encode_request(
self.name, key_data, value_data, thread_id(), to_millis(timeout)
)
return self._invoke_on_key(request, key_data, map_try_put_codec.decode_response)
def _put_transient_internal(self, key_data, value_data, ttl, max_idle):
if max_idle is not None:
request = map_put_transient_with_max_idle_codec.encode_request(
self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle)
)
else:
request = map_put_transient_codec.encode_request(
self.name, key_data, value_data, thread_id(), to_millis(ttl)
)
return self._invoke_on_key(request, key_data)
def _put_if_absent_internal(self, key_data, value_data, ttl, max_idle):
def handler(message):
return self._to_object(map_put_if_absent_codec.decode_response(message))
if max_idle is not None:
request = map_put_if_absent_with_max_idle_codec.encode_request(
self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle)
)
else:
request = map_put_if_absent_codec.encode_request(
self.name, key_data, value_data, thread_id(), to_millis(ttl)
)
return self._invoke_on_key(request, key_data, handler)
def _replace_if_same_internal(self, key_data, old_value_data, new_value_data):
request = map_replace_if_same_codec.encode_request(
self.name, key_data, old_value_data, new_value_data, thread_id()
)
return self._invoke_on_key(request, key_data, map_replace_if_same_codec.decode_response)
def _replace_internal(self, key_data, value_data):
def handler(message):
return self._to_object(map_replace_codec.decode_response(message))
request = map_replace_codec.encode_request(self.name, key_data, value_data, thread_id())
return self._invoke_on_key(request, key_data, handler)
def _evict_internal(self, key_data):
request = map_evict_codec.encode_request(self.name, key_data, thread_id())
return self._invoke_on_key(request, key_data, map_evict_codec.decode_response)
def _load_all_internal(self, key_data_list, replace_existing_values):
request = map_load_given_keys_codec.encode_request(
self.name, key_data_list, replace_existing_values
)
return self._invoke(request)
def _execute_on_key_internal(self, key_data, entry_processor_data):
def handler(message):
return self._to_object(map_execute_on_key_codec.decode_response(message))
request = map_execute_on_key_codec.encode_request(
self.name, entry_processor_data, key_data, thread_id()
)
return self._invoke_on_key(request, key_data, handler)
class MapFeatNearCache(Map[KeyType, ValueType]):
"""Map proxy implementation featuring Near Cache"""
def __init__(self, service_name, name, context):
super(MapFeatNearCache, self).__init__(service_name, name, context)
self._invalidation_listener_id = None
self._near_cache = context.near_cache_manager.get_or_create_near_cache(name)
async def clear(self):
self._near_cache._clear()
return await super(MapFeatNearCache, self).clear()
async def evict_all(self):
self._near_cache.clear()
return await super(MapFeatNearCache, self).evict_all()
async def load_all(self, keys=None, replace_existing_values=True):
if keys is None and replace_existing_values:
self._near_cache.clear()
return await super(MapFeatNearCache, self).load_all(keys, replace_existing_values)
async def _on_destroy(self):
await self._remove_near_cache_invalidation_listener()
self._near_cache.clear()
await super(MapFeatNearCache, self)._on_destroy()
async def _add_near_cache_invalidation_listener(self):
codec = map_add_near_cache_invalidation_listener_codec
request = codec.encode_request(self.name, EntryEventType.INVALIDATION, self._is_smart)
self._invalidation_listener_id = await self._register_listener(
request,
lambda r: codec.decode_response(r),
lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id),
lambda m: codec.handle(m, self._handle_invalidation, self._handle_batch_invalidation),
)
async def _remove_near_cache_invalidation_listener(self):
if self._invalidation_listener_id:
await self.remove_entry_listener(self._invalidation_listener_id)
def _handle_invalidation(self, key, source_uuid, partition_uuid, sequence):
# key is always ``Data``
# null key means near cache has to remove all entries in it.
# see MapAddNearCacheEntryListenerMessageTask.
if key is None:
self._near_cache._clear()
else:
self._invalidate_cache(key)
def _handle_batch_invalidation(self, keys, source_uuids, partition_uuids, sequences):
# key_list is always list of ``Data``
for key_data in keys:
self._invalidate_cache(key_data)
def _invalidate_cache(self, key_data):
self._near_cache._invalidate(key_data)
def _invalidate_cache_batch(self, key_data_list):
for key_data in key_data_list:
self._near_cache._invalidate(key_data)
# internals
async def _contains_key_internal(self, key_data):
try:
return self._near_cache[key_data]
except KeyError:
return await super(MapFeatNearCache, self)._contains_key_internal(key_data)
async def _get_internal(self, key_data):
try:
return self._near_cache[key_data]
except KeyError:
value = await super(MapFeatNearCache, self)._get_internal(key_data)
self._near_cache.__setitem__(key_data, value)
return value
async def _get_all_internal(self, partition_to_keys, tasks=None):
tasks = tasks or []
for key_dic in partition_to_keys.values():
for key in list(key_dic.keys()):
try:
key_data = key_dic[key]
value = self._near_cache[key_data]
future = asyncio.Future()
future.set_result((key, value))
tasks.append(future)
del key_dic[key]
except KeyError:
pass
return await super(MapFeatNearCache, self)._get_all_internal(partition_to_keys, tasks)
def _try_remove_internal(self, key_data, timeout):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._try_remove_internal(key_data, timeout)
def _try_put_internal(self, key_data, value_data, timeout):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._try_put_internal(key_data, value_data, timeout)
def _set_internal(self, key_data, value_data, ttl, max_idle):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._set_internal(key_data, value_data, ttl, max_idle)
def _set_ttl_internal(self, key_data, ttl):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._set_ttl_internal(key_data, ttl)
def _replace_internal(self, key_data, value_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._replace_internal(key_data, value_data)
def _replace_if_same_internal(self, key_data, old_value_data, new_value_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._replace_if_same_internal(
key_data, old_value_data, new_value_data
)
def _remove_internal(self, key_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._remove_internal(key_data)
def _remove_all_internal(self, predicate_data):
self._near_cache.clear()
return super(MapFeatNearCache, self)._remove_all_internal(predicate_data)
def _remove_if_same_internal_(self, key_data, value_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._remove_if_same_internal_(key_data, value_data)
def _put_transient_internal(self, key_data, value_data, ttl, max_idle):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._put_transient_internal(
key_data, value_data, ttl, max_idle
)
async def _put_internal(self, key_data, value_data, ttl, max_idle):
self._invalidate_cache(key_data)
return await super(MapFeatNearCache, self)._put_internal(
key_data, value_data, ttl, max_idle
)
def _put_if_absent_internal(self, key_data, value_data, ttl, max_idle):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._put_if_absent_internal(
key_data, value_data, ttl, max_idle
)
def _load_all_internal(self, key_data_list, replace_existing_values):
self._invalidate_cache_batch(key_data_list)
return super(MapFeatNearCache, self)._load_all_internal(
key_data_list, replace_existing_values
)
def _execute_on_key_internal(self, key_data, entry_processor_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._execute_on_key_internal(
key_data, entry_processor_data
)
def _evict_internal(self, key_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._evict_internal(key_data)
def _delete_internal(self, key_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._delete_internal(key_data)
async def create_map_proxy(service_name, name, context):
near_cache_config = context.config.near_caches.get(name, None)
if near_cache_config is None:
return Map(service_name, name, context)
nc = MapFeatNearCache(service_name, name, context)
if nc._near_cache.invalidate_on_change:
await nc._add_near_cache_invalidation_listener()
return nc