Source code for hazelcast.internal.asyncio_proxy.map

import asyncio
import itertools
import typing

from hazelcast.aggregator import Aggregator
from hazelcast.config import IndexUtil, IndexType, IndexConfig
from hazelcast.core import SimpleEntryView
from hazelcast.projection import Projection
from hazelcast.protocol import PagingPredicateHolder
from hazelcast.protocol.codec import (
    map_add_entry_listener_codec,
    map_add_entry_listener_to_key_codec,
    map_add_entry_listener_with_predicate_codec,
    map_add_entry_listener_to_key_with_predicate_codec,
    map_clear_codec,
    map_contains_key_codec,
    map_contains_value_codec,
    map_delete_codec,
    map_entry_set_codec,
    map_entries_with_predicate_codec,
    map_evict_codec,
    map_evict_all_codec,
    map_flush_codec,
    map_get_codec,
    map_get_all_codec,
    map_get_entry_view_codec,
    map_is_empty_codec,
    map_key_set_codec,
    map_key_set_with_predicate_codec,
    map_load_all_codec,
    map_load_given_keys_codec,
    map_put_codec,
    map_put_all_codec,
    map_put_if_absent_codec,
    map_put_transient_codec,
    map_size_codec,
    map_remove_codec,
    map_remove_if_same_codec,
    map_remove_entry_listener_codec,
    map_replace_codec,
    map_replace_if_same_codec,
    map_set_codec,
    map_try_put_codec,
    map_try_remove_codec,
    map_values_codec,
    map_values_with_predicate_codec,
    map_add_interceptor_codec,
    map_aggregate_codec,
    map_aggregate_with_predicate_codec,
    map_project_codec,
    map_project_with_predicate_codec,
    map_execute_on_all_keys_codec,
    map_execute_on_key_codec,
    map_execute_on_keys_codec,
    map_execute_with_predicate_codec,
    map_add_index_codec,
    map_set_ttl_codec,
    map_entries_with_paging_predicate_codec,
    map_key_set_with_paging_predicate_codec,
    map_values_with_paging_predicate_codec,
    map_put_with_max_idle_codec,
    map_put_if_absent_with_max_idle_codec,
    map_put_transient_with_max_idle_codec,
    map_set_with_max_idle_codec,
    map_remove_interceptor_codec,
    map_remove_all_codec,
    map_add_near_cache_invalidation_listener_codec,
)
from hazelcast.internal.asyncio_proxy.base import (
    Proxy,
    EntryEvent,
    EntryEventType,
    get_entry_listener_flags,
)
from hazelcast.predicate import Predicate, _PagingPredicate
from hazelcast.serialization.data import Data
from hazelcast.types import AggregatorResultType, KeyType, ValueType, ProjectionType
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import (
    check_not_none,
    thread_id,
    to_millis,
    IterationType,
    deserialize_entry_list_in_place,
    deserialize_list_in_place,
)


EntryEventCallable = typing.Callable[[EntryEvent[KeyType, ValueType]], None]


[docs] class Map(Proxy, typing.Generic[KeyType, ValueType]): """Hazelcast Map client proxy to access the map on the cluster. Concurrent, distributed, observable and queryable map. Example: >>> my_map = await client.get_map("my_map") >>> print("map.put", await my_map.put("key", "value")) >>> print("map.contains_key", await my_map.contains_key("key")) >>> print("map.get", await my_map.get("key")) >>> print("map.size", await my_map.size()) This class does not allow ``None`` to be used as a key or value. Warning: Asyncio client map proxy is not thread-safe, do not access it from other threads. """ def __init__(self, service_name, name, context): super(Map, self).__init__(service_name, name, context) self._reference_id_generator = context.lock_reference_id_generator
[docs] async def add_entry_listener( self, include_value: bool = False, key: KeyType = None, predicate: Predicate = None, added_func: EntryEventCallable = None, removed_func: EntryEventCallable = None, updated_func: EntryEventCallable = None, evicted_func: EntryEventCallable = None, evict_all_func: EntryEventCallable = None, clear_all_func: EntryEventCallable = None, merged_func: EntryEventCallable = None, expired_func: EntryEventCallable = None, loaded_func: EntryEventCallable = None, ) -> str: """Adds a continuous entry listener for this map. Listener will get notified for map events filtered with given parameters. The listener functions must not block. Args: include_value: Whether received event should include the value or not. key: Key for filtering the events. predicate: Predicate for filtering the events. added_func: Function to be called when an entry is added to map. removed_func: Function to be called when an entry is removed from map. updated_func: Function to be called when an entry is updated. evicted_func: Function to be called when an entry is evicted from map. evict_all_func: Function to be called when entries are evicted from map. clear_all_func: Function to be called when entries are cleared from map. merged_func: Function to be called when WAN replicated entry is merged. expired_func: Function to be called when an entry's live time is expired. loaded_func: Function to be called when an entry is loaded from a map loader. Returns: A registration id which is used as a key to remove the listener. """ flags = get_entry_listener_flags( ADDED=added_func, REMOVED=removed_func, UPDATED=updated_func, EVICTED=evicted_func, EXPIRED=expired_func, EVICT_ALL=evict_all_func, CLEAR_ALL=clear_all_func, MERGED=merged_func, LOADED=loaded_func, ) if key is not None and predicate is not None: try: key_data = self._to_data(key) predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry( e, self.add_entry_listener, include_value, key, predicate, added_func, removed_func, updated_func, evicted_func, evict_all_func, clear_all_func, merged_func, expired_func, loaded_func, ) with_key_and_predicate_codec = map_add_entry_listener_to_key_with_predicate_codec request = with_key_and_predicate_codec.encode_request( self.name, key_data, predicate_data, include_value, flags, self._is_smart ) response_decoder = with_key_and_predicate_codec.decode_response event_message_handler = with_key_and_predicate_codec.handle elif key is not None and predicate is None: try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry( e, self.add_entry_listener, include_value, key, predicate, added_func, removed_func, updated_func, evicted_func, evict_all_func, clear_all_func, merged_func, expired_func, loaded_func, ) with_key_codec = map_add_entry_listener_to_key_codec request = with_key_codec.encode_request( self.name, key_data, include_value, flags, self._is_smart ) response_decoder = with_key_codec.decode_response event_message_handler = with_key_codec.handle elif key is None and predicate is not None: try: predicate = self._to_data(predicate) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry( e, self.add_entry_listener, include_value, key, predicate, added_func, removed_func, updated_func, evicted_func, evict_all_func, clear_all_func, merged_func, expired_func, loaded_func, ) with_predicate_codec = map_add_entry_listener_with_predicate_codec request = with_predicate_codec.encode_request( self.name, predicate, include_value, flags, self._is_smart ) response_decoder = with_predicate_codec.decode_response event_message_handler = with_predicate_codec.handle else: codec = map_add_entry_listener_codec request = codec.encode_request(self.name, include_value, flags, self._is_smart) response_decoder = codec.decode_response event_message_handler = codec.handle def handle_event_entry( key_data, value_data, old_value_data, merging_value_data, event_type, uuid, number_of_affected_entries, ): event = EntryEvent( self._to_object(key_data), self._to_object(value_data), self._to_object(old_value_data), self._to_object(merging_value_data), event_type, uuid, number_of_affected_entries, ) if event.event_type == EntryEventType.ADDED: if added_func: added_func(event) elif event.event_type == EntryEventType.REMOVED: if removed_func: removed_func(event) elif event.event_type == EntryEventType.UPDATED: if updated_func: updated_func(event) elif event.event_type == EntryEventType.EVICTED: if evicted_func: evicted_func(event) elif event.event_type == EntryEventType.EVICT_ALL: if evict_all_func: evict_all_func(event) elif event.event_type == EntryEventType.CLEAR_ALL: if clear_all_func: clear_all_func(event) elif event.event_type == EntryEventType.MERGED: if merged_func: merged_func(event) elif event.event_type == EntryEventType.EXPIRED: if expired_func: expired_func(event) elif event.event_type == EntryEventType.LOADED: if loaded_func: loaded_func(event) return await self._register_listener( request, lambda r: response_decoder(r), lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id), lambda m: event_message_handler(m, handle_event_entry), )
[docs] async def add_index( self, attributes: typing.Sequence[str] = None, index_type: typing.Union[int, str] = IndexType.SORTED, name: str = None, bitmap_index_options: typing.Dict[str, typing.Any] = None, ) -> None: """Adds an index to this map for the specified entries so that queries can run faster. Example: Let's say your map values are Employee objects. >>> class Employee(IdentifiedDataSerializable): >>> active = false >>> age = None >>> name = None >>> #other fields >>> >>> #methods If you query your values mostly based on age and active fields, you should consider indexing these. >>> employees = await client.get_map("employees") >>> await employees.add_index(attributes=["age"]) # Sorted index for range queries >>> await employees.add_index(attributes=["active"], index_type=IndexType.HASH)) # Hash index for equality predicates Index attribute should either have a getter method or be public. You should also make sure to add the indexes before adding entries to this map. Indexing time is executed in parallel on each partition by operation threads. The Map is not blocked during this operation. The time taken in proportional to the size of the Map and the number Members. Until the index finishes being created, any searches for the attribute will use a full Map scan, thus avoiding using a partially built index and returning incorrect results. Args: attributes: List of indexed attributes. index_type: Type of the index. By default, set to ``SORTED``. name: Name of the index. bitmap_index_options: Bitmap index options. - **unique_key:** (str): The unique key attribute is used as a source of values which uniquely identify each entry being inserted into an index. Defaults to ``KEY_ATTRIBUTE_NAME``. See the :class:`hazelcast.config.QueryConstants` for possible values. - **unique_key_transformation** (int|str): The transformation is applied to every value extracted from the unique key attribue. Defaults to ``OBJECT``. See the :class:`hazelcast.config.UniqueKeyTransformation` for possible values. """ d = { "name": name, "type": index_type, "attributes": attributes, "bitmap_index_options": bitmap_index_options, } config = IndexConfig.from_dict(d) validated = IndexUtil.validate_and_normalize(self.name, config) request = map_add_index_codec.encode_request(self.name, validated) return await self._invoke(request)
[docs] async def add_interceptor(self, interceptor: typing.Any) -> str: """Adds an interceptor for this map. Added interceptor will intercept operations and execute user defined methods. Args: interceptor: Interceptor for the map which includes user defined methods. Returns: Id of registered interceptor. """ try: interceptor_data = self._to_data(interceptor) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.add_interceptor, interceptor) request = map_add_interceptor_codec.encode_request(self.name, interceptor_data) return await self._invoke(request, map_add_interceptor_codec.decode_response)
[docs] async def aggregate( self, aggregator: Aggregator[AggregatorResultType], predicate: Predicate = None ) -> AggregatorResultType: """Applies the aggregation logic on map entries and filter the result with the predicate, if given. Args: aggregator: Aggregator to aggregate the entries with. predicate: Predicate to filter the entries with. Returns: The result of the aggregation. """ check_not_none(aggregator, "aggregator can't be none") if predicate: if isinstance(predicate, _PagingPredicate): raise AssertionError("Paging predicate is not supported.") try: aggregator_data = self._to_data(aggregator) predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.aggregate, aggregator, predicate) def handler(message): return self._to_object(map_aggregate_with_predicate_codec.decode_response(message)) request = map_aggregate_with_predicate_codec.encode_request( self.name, aggregator_data, predicate_data ) else: try: aggregator_data = self._to_data(aggregator) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.aggregate, aggregator, predicate) def handler(message): return self._to_object(map_aggregate_codec.decode_response(message)) request = map_aggregate_codec.encode_request(self.name, aggregator_data) return await self._invoke(request, handler)
[docs] async def clear(self) -> None: """Clears the map. The ``MAP_CLEARED`` event is fired for any registered listeners. """ request = map_clear_codec.encode_request(self.name) return await self._invoke(request)
[docs] async def contains_key(self, key: KeyType) -> bool: """Determines whether this map contains an entry with the key. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The specified key. Returns: ``True`` if this map contains an entry for the specified key, ``False`` otherwise. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.contains_key, key) return await self._contains_key_internal(key_data)
[docs] async def contains_value(self, value: ValueType) -> bool: """Determines whether this map contains one or more keys for the specified value. Args: value: The specified value. Returns: ``True`` if this map contains an entry for the specified value, ``False`` otherwise. """ check_not_none(value, "value can't be None") try: value_data = self._to_data(value) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.contains_value, value) request = map_contains_value_codec.encode_request(self.name, value_data) return await self._invoke(request, map_contains_value_codec.decode_response)
[docs] async def delete(self, key: KeyType) -> None: """Removes the mapping for a key from this map if it is present (optional operation). Unlike remove(object), this operation does not return the removed value, which avoids the serialization cost of the returned value. If the removed value will not be used, a delete operation is preferred over a remove operation for better performance. The map will not contain a mapping for the specified key once the call returns. Warning: This method breaks the contract of EntryListener. When an entry is removed by delete(), it fires an ``EntryEvent`` with a ``None`` ``old_value``. Also, a listener with predicates will have ``None`` values, so only the keys can be queried via predicates. Args: key: Key of the mapping to be deleted. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.delete, key) return await self._delete_internal(key_data)
[docs] async def entry_set( self, predicate: Predicate = None ) -> typing.List[typing.Tuple[KeyType, ValueType]]: """Returns a list clone of the mappings contained in this map. Warning: The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa. Args: predicate: Predicate for the map to filter entries. Returns: The list of key-value tuples in the map. """ if predicate: if isinstance(predicate, _PagingPredicate): predicate.iteration_type = IterationType.ENTRY try: holder = PagingPredicateHolder.of(predicate, self._to_data) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.entry_set, predicate) def handler(message): response = map_entries_with_paging_predicate_codec.decode_response(message) predicate.anchor_list = response["anchor_data_list"].as_anchor_list( self._to_object ) entry_data_list = response["response"] return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_entries_with_paging_predicate_codec.encode_request(self.name, holder) else: try: predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.entry_set, predicate) def handler(message): entry_data_list = map_entries_with_predicate_codec.decode_response(message) return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_entries_with_predicate_codec.encode_request(self.name, predicate_data) else: def handler(message): entry_data_list = map_entry_set_codec.decode_response(message) return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_entry_set_codec.encode_request(self.name) return await self._invoke(request, handler)
[docs] async def evict(self, key: KeyType) -> bool: """Evicts the specified key from this map. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: Key to evict. Returns: ``True`` if the key is evicted, ``False`` otherwise. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.evict, key) return await self._evict_internal(key_data)
[docs] async def evict_all(self) -> None: """Evicts all keys from this map except the locked ones. The ``EVICT_ALL`` event is fired for any registered listeners. """ request = map_evict_all_codec.encode_request(self.name) return await self._invoke(request)
[docs] async def execute_on_entries( self, entry_processor: typing.Any, predicate: Predicate | None = None ) -> typing.List[typing.Any]: """Applies the user defined EntryProcessor to all the entries in the map or entries in the map which satisfies the predicate if provided. Returns the results mapped by each key in the map. Args: entry_processor: A stateful serializable object which represents the EntryProcessor defined on server side. This object must have a serializable EntryProcessor counter part registered on server side with the actual ``com.hazelcast.map.EntryProcessor`` implementation. predicate: Predicate for filtering the entries. Returns: List of map entries which includes the keys and the results of the entry process. """ if predicate: try: entry_processor_data = self._to_data(entry_processor) predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry( e, self.execute_on_entries, entry_processor, predicate ) def handler(message): entry_data_list = map_execute_with_predicate_codec.decode_response(message) return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_execute_with_predicate_codec.encode_request( self.name, entry_processor_data, predicate_data ) else: try: entry_processor_data = self._to_data(entry_processor) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry( e, self.execute_on_entries, entry_processor, predicate ) def handler(message): entry_data_list = map_execute_on_all_keys_codec.decode_response(message) return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_execute_on_all_keys_codec.encode_request(self.name, entry_processor_data) return await self._invoke(request, handler)
[docs] async def execute_on_key(self, key: KeyType, entry_processor: typing.Any) -> typing.Any: """Applies the user defined EntryProcessor to the entry mapped by the key. Returns the object which is the result of EntryProcessor's process method. Args: key: Specified key for the entry to be processed. entry_processor: A stateful serializable object which represents the EntryProcessor defined on server side. This object must have a serializable EntryProcessor counter part registered on server side with the actual ``com.hazelcast.map.EntryProcessor`` implementation. Returns: Result of entry process. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) entry_processor_data = self._to_data(entry_processor) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.execute_on_key, key, entry_processor) return await self._execute_on_key_internal(key_data, entry_processor_data)
[docs] async def execute_on_keys( self, keys: typing.Sequence[KeyType], entry_processor: typing.Any ) -> typing.List[typing.Any]: """Applies the user defined EntryProcessor to the entries mapped by the collection of keys. Returns the results mapped by each key in the collection. Args: keys: Collection of the keys for the entries to be processed. entry_processor: A stateful serializable object which represents the EntryProcessor defined on server side. This object must have a serializable EntryProcessor counter part registered on server side with the actual ``com.hazelcast.map.EntryProcessor`` implementation. Returns: List of map entries which includes the keys and the results of the entry process. """ if len(keys) == 0: return [] try: key_list = [] for key in keys: check_not_none(key, "key can't be None") key_list.append(self._to_data(key)) entry_processor_data = self._to_data(entry_processor) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.execute_on_keys, keys, entry_processor) def handler(message): entry_data_list = map_execute_on_keys_codec.decode_response(message) return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_execute_on_keys_codec.encode_request( self.name, entry_processor_data, key_list ) return await self._invoke(request, handler)
[docs] async def flush(self) -> None: """Flushes all the local dirty entries.""" request = map_flush_codec.encode_request(self.name) return await self._invoke(request)
[docs] async def get(self, key: KeyType) -> typing.Optional[ValueType]: """Returns the value for the specified key, or ``None`` if this map does not contain this key. Warning: This method returns a clone of original value, modifying the returned value does not change the actual value in the map. One should put modified value back to make changes visible to all nodes. >>> value = await my_map.get(key) >>> value.update_some_property() >>> await my_map.put(key,value) Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The specified key. Returns: The value for the specified key. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.get, key) return await self._get_internal(key_data)
[docs] async def get_all(self, keys: typing.Sequence[KeyType]) -> typing.Dict[KeyType, ValueType]: """Returns the entries for the given keys. Warning: The returned map is NOT backed by the original map, so changes to the original map are NOT reflected in the returned map, and vice-versa. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: keys: Keys to get. Returns: Dictionary of map entries. """ check_not_none(keys, "keys can't be None") if not keys: return {} partition_service = self._context.partition_service partition_to_keys: typing.Dict[int, typing.Dict[KeyType, Data]] = {} for key in keys: check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.get_all, keys) partition_id = partition_service.get_partition_id(key_data) try: partition_to_keys[partition_id][key] = key_data except KeyError: partition_to_keys[partition_id] = {key: key_data} return await self._get_all_internal(partition_to_keys)
[docs] async def get_entry_view(self, key: KeyType) -> SimpleEntryView[KeyType, ValueType]: """Returns the EntryView for the specified key. Warning: This method returns a clone of original mapping, modifying the returned value does not change the actual value in the map. One should put modified value back to make changes visible to all nodes. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The key of the entry. Returns: EntryView of the specified key. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.get_entry_view, key) def handler(message): response = map_get_entry_view_codec.decode_response(message) entry_view = response["response"] if not entry_view: return None entry_view.key = self._to_object(entry_view.key) entry_view.value = self._to_object(entry_view.value) return entry_view request = map_get_entry_view_codec.encode_request(self.name, key_data, thread_id()) return await self._invoke_on_key(request, key_data, handler)
[docs] async def is_empty(self) -> bool: """Returns whether this map contains no key-value mappings or not. Returns: ``True`` if this map contains no key-value mappings, ``False`` otherwise. """ request = map_is_empty_codec.encode_request(self.name) return await self._invoke(request, map_is_empty_codec.decode_response)
[docs] async def key_set(self, predicate: Predicate | None = None) -> typing.List[ValueType]: """Returns a List clone of the keys contained in this map or the keys of the entries filtered with the predicate if provided. Warning: The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa. Args: predicate: Predicate to filter the entries. Returns: A list of the clone of the keys. """ if predicate: if isinstance(predicate, _PagingPredicate): predicate.iteration_type = IterationType.KEY try: holder = PagingPredicateHolder.of(predicate, self._to_data) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.key_set, predicate) def handler(message): response = map_key_set_with_paging_predicate_codec.decode_response(message) predicate.anchor_list = response["anchor_data_list"].as_anchor_list( self._to_object ) data_list = response["response"] return deserialize_list_in_place(data_list, self._to_object) request = map_key_set_with_paging_predicate_codec.encode_request(self.name, holder) else: try: predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.key_set, predicate) def handler(message): data_list = map_key_set_with_predicate_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) request = map_key_set_with_predicate_codec.encode_request(self.name, predicate_data) else: def handler(message): data_list = map_key_set_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) request = map_key_set_codec.encode_request(self.name) return await self._invoke(request, handler)
[docs] async def load_all( self, keys: typing.Sequence[KeyType] = None, replace_existing_values: bool = True ) -> None: """Loads all keys from the store at server side or loads the given keys if provided. Args: keys: Keys of the entry values to load. replace_existing_values: Whether the existing values will be replaced or not with those loaded from the server side MapLoader. """ if keys: try: key_data_list = [self._to_data(key) for key in keys] except SchemaNotReplicatedError as e: return await self._send_schema_and_retry( e, self.load_all, keys, replace_existing_values ) return await self._load_all_internal(key_data_list, replace_existing_values) request = map_load_all_codec.encode_request(self.name, replace_existing_values) return await self._invoke(request)
[docs] async def project( self, projection: Projection[ProjectionType], predicate: Predicate = None ) -> ProjectionType: """Applies the projection logic on map entries and filter the result with the predicate, if given. Args: projection: Projection to project the entries with. predicate: Predicate to filter the entries with. Returns: The result of the projection. """ check_not_none(projection, "Projection can't be none") if predicate: if isinstance(predicate, _PagingPredicate): raise AssertionError("Paging predicate is not supported.") try: projection_data = self._to_data(projection) predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.project, projection, predicate) def handler(message): data_list = map_project_with_predicate_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) request = map_project_with_predicate_codec.encode_request( self.name, projection_data, predicate_data ) else: try: projection_data = self._to_data(projection) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.project, projection, predicate) def handler(message): data_list = map_project_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) request = map_project_codec.encode_request(self.name, projection_data) return await self._invoke(request, handler)
[docs] async def put( self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None ) -> typing.Optional[ValueType]: """Associates the specified value with the specified key in this map. If the map previously contained a mapping for the key, the old value is replaced by the specified value. If ttl is provided, entry will expire and get evicted after the ttl. Warning: This method returns a clone of the previous value, not the original (identically equal) value previously put into the map. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The specified key. value: The value to associate with the key. ttl: Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite time-to-live. max_idle: Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite max idle time. Returns: Previous value associated with key or ``None`` if there was no mapping for key. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.put, key, value, ttl, max_idle) return await self._put_internal(key_data, value_data, ttl, max_idle)
[docs] async def put_all(self, map: typing.Dict[KeyType, ValueType]) -> None: """Copies all the mappings from the specified map to this map. No atomicity guarantees are given. In the case of a failure, some key-value tuples may get written, while others are not. Args: map: Dictionary which includes mappings to be stored in this map. """ check_not_none(map, "map can't be None") if not map: return None partition_service = self._context.partition_service partition_map: typing.Dict[int, typing.List[typing.Tuple[Data, Data]]] = {} for key, value in map.items(): check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: entry = (self._to_data(key), self._to_data(value)) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.put_all, map) partition_id = partition_service.get_partition_id(entry[0]) try: partition_map[partition_id].append(entry) except KeyError: partition_map[partition_id] = [entry] async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined] for partition_id, entry_list in partition_map.items(): request = map_put_all_codec.encode_request( self.name, entry_list, False ) # TODO trigger map loader tg.create_task(self._ainvoke_on_partition(request, partition_id)) return None
[docs] async def put_if_absent( self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None ) -> typing.Optional[ValueType]: """Associates the specified key with the given value if it is not already associated. If ttl is provided, entry will expire and get evicted after the ttl. This is equivalent to below, except that the action is performed atomically: >>> if not (await my_map.contains_key(key)): >>> return await my_map.put(key,value) >>> else: >>> return await my_map.get(key) Warning: This method returns a clone of the previous value, not the original (identically equal) value previously put into the map. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: Key of the entry. value: Value of the entry. ttl: Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite time-to-live. max_idle: Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite max idle time. Returns: Old value of the entry. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry( e, self.put_if_absent, key, value, ttl, max_idle ) return await self._put_if_absent_internal(key_data, value_data, ttl, max_idle)
[docs] async def put_transient( self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None ) -> None: """Same as ``put``, but MapStore defined at the server side will not be called. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: Key of the entry. value: Value of the entry. ttl: Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite time-to-live. max_idle: Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite max idle time. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry( e, self.put_transient, key, value, ttl, max_idle ) return await self._put_transient_internal(key_data, value_data, ttl, max_idle)
[docs] async def remove(self, key: KeyType) -> typing.Optional[ValueType]: """Removes the mapping for a key from this map if it is present. The map will not contain a mapping for the specified key once the call returns. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: Key of the mapping to be deleted. Returns: The previous value associated with key, or ``None`` if there was no mapping for key. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.remove, key) return await self._remove_internal(key_data)
[docs] async def remove_all(self, predicate: Predicate) -> None: """Removes all entries which match with the supplied predicate. Args: predicate: Used to select entries to be removed from map. """ check_not_none(predicate, "predicate can't be None") try: predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.remove_all, predicate) return await self._remove_all_internal(predicate_data)
[docs] async def remove_if_same(self, key: KeyType, value: ValueType) -> bool: """Removes the entry for a key only if it is currently mapped to a given value. This is equivalent to below, except that the action is performed atomically: >>> if (await my_map.contains_key(key)) and (await my_map.get(key) == value): >>> await my_map.remove(key) >>> return True >>> else: >>> return False Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The specified key. value: Remove the key if it has this value. Returns: ``True`` if the value was removed, ``False`` otherwise. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.remove_if_same, key, value) return await self._remove_if_same_internal_(key_data, value_data)
[docs] async def remove_entry_listener(self, registration_id: str) -> bool: """Removes the specified entry listener. Returns silently if there is no such listener added before. Args: registration_id: Id of registered listener. Returns: ``True`` if registration is removed, ``False`` otherwise. """ return await self._deregister_listener(registration_id)
[docs] async def remove_interceptor(self, registration_id: str) -> bool: """Removes the given interceptor for this map, so it will not intercept operations anymore. Args: registration_id: Registration ID of the map interceptor. Returns: ``True`` if the interceptor is removed, ``False`` otherwise. """ check_not_none(registration_id, "Interceptor registration id should not be None") request = map_remove_interceptor_codec.encode_request(self.name, registration_id) return await self._invoke(request, map_remove_interceptor_codec.decode_response)
[docs] async def replace(self, key: KeyType, value: ValueType) -> typing.Optional[ValueType]: """Replaces the entry for a key only if it is currently mapped to some value. This is equivalent to below, except that the action is performed atomically: >>> if await my_map.contains_key(key): >>> return await my_map.put(key,value) >>> else: >>> return None Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Warning: This method returns a clone of the previous value, not the original (identically equal) value previously put into the map. Args: key: The specified key. value: The value to replace the previous value. Returns: Previous value associated with key, or ``None`` if there was no mapping for key. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.replace, key, value) return await self._replace_internal(key_data, value_data)
[docs] async def replace_if_same( self, key: ValueType, old_value: ValueType, new_value: ValueType ) -> bool: """Replaces the entry for a key only if it is currently mapped to a given value. This is equivalent to below, except that the action is performed atomically: >>> if (await my_map.contains_key(key)) and (await my_map.get(key) == old_value): >>> await my_map.put(key, new_value) >>> return True >>> else: >>> return False Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The specified key. old_value: Replace the key value if it is the old value. new_value: The new value to replace the old value. Returns: ``True`` if the value was replaced, ``False`` otherwise. """ check_not_none(key, "key can't be None") check_not_none(old_value, "old_value can't be None") check_not_none(new_value, "new_value can't be None") try: key_data = self._to_data(key) old_value_data = self._to_data(old_value) new_value_data = self._to_data(new_value) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry( e, self.replace_if_same, key, old_value, new_value ) return await self._replace_if_same_internal(key_data, old_value_data, new_value_data)
[docs] async def set( self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None ) -> None: """Puts an entry into this map. Similar to the put operation except that set doesn't return the old value, which is more efficient. If ttl is provided, entry will expire and get evicted after the ttl. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: Key of the entry. value: Value of the entry. ttl: Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite time-to-live. max_idle: Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite max idle time. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.set, key, value, ttl, max_idle) return await self._set_internal(key_data, value_data, ttl, max_idle)
[docs] async def set_ttl(self, key: KeyType, ttl: float) -> None: """Updates the TTL (time to live) value of the entry specified by the given key with a new TTL value. New TTL value is valid starting from the time this operation is invoked, not since the time the entry was created. If the entry does not exist or is already expired, this call has no effect. Args: key: The key of the map entry. ttl: Maximum time in seconds for this entry to stay in the map. Setting this to ``0`` means infinite time-to-live. """ check_not_none(key, "key can't be None") check_not_none(ttl, "ttl can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.set_ttl, key, ttl) return await self._set_ttl_internal(key_data, ttl)
[docs] async def size(self) -> int: """Returns the number of entries in this map. Returns: Number of entries in this map. """ request = map_size_codec.encode_request(self.name) return await self._invoke(request, map_size_codec.decode_response)
[docs] async def try_put(self, key: KeyType, value: ValueType, timeout: float = 0) -> bool: """Tries to put the given key and value into this map and returns immediately if timeout is not provided. If timeout is provided, operation waits until it is completed or timeout is reached. Args: key: Key of the entry. value: Value of the entry. timeout: Maximum time in seconds to wait. Returns: ``True`` if the put is successful, ``False`` otherwise. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.try_put, key, value, timeout) return await self._try_put_internal(key_data, value_data, timeout)
[docs] async def try_remove(self, key: KeyType, timeout: float = 0) -> bool: """Tries to remove the given key from this map and returns immediately if timeout is not provided. If timeout is provided, operation waits until it is completed or timeout is reached. Args: key: Key of the entry to be deleted. timeout: Maximum time in seconds to wait. Returns: ``True`` if the remove is successful, ``False`` otherwise. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.try_remove, key, timeout) return await self._try_remove_internal(key_data, timeout)
[docs] async def values(self, predicate: Predicate = None) -> typing.List[ValueType]: """Returns a list clone of the values contained in this map or values of the entries which are filtered with the predicate if provided. Warning: The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa. Args: predicate: Predicate to filter the entries. Returns: A list of clone of the values contained in this map. """ if predicate: if isinstance(predicate, _PagingPredicate): predicate.iteration_type = IterationType.VALUE try: holder = PagingPredicateHolder.of(predicate, self._to_data) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.values, predicate) def handler(message): response = map_values_with_paging_predicate_codec.decode_response(message) predicate.anchor_list = response["anchor_data_list"].as_anchor_list( self._to_object ) data_list = response["response"] return deserialize_list_in_place(data_list, self._to_object) request = map_values_with_paging_predicate_codec.encode_request(self.name, holder) else: try: predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.values, predicate) def handler(message): data_list = map_values_with_predicate_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) request = map_values_with_predicate_codec.encode_request(self.name, predicate_data) else: def handler(message): data_list = map_values_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) request = map_values_codec.encode_request(self.name) return await self._invoke(request, handler)
def _contains_key_internal(self, key_data): request = map_contains_key_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data, map_contains_key_codec.decode_response) def _get_internal(self, key_data): def handler(message): return self._to_object(map_get_codec.decode_response(message)) request = map_get_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data, handler) async def _get_all_internal(self, partition_to_keys, tasks=None): def handler(message): entry_data_list = map_get_all_codec.decode_response(message) return deserialize_entry_list_in_place(entry_data_list, self._to_object) tasks = tasks or [] async with asyncio.TaskGroup() as tg: for partition_id, key_dict in partition_to_keys.items(): request = map_get_all_codec.encode_request(self.name, key_dict.values()) task = tg.create_task(self._ainvoke_on_partition(request, partition_id, handler)) tasks.append(task) kvs = itertools.chain.from_iterable(task.result() for task in tasks) return dict(kvs) def _remove_internal(self, key_data): def handler(message): return self._to_object(map_remove_codec.decode_response(message)) request = map_remove_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data, handler) def _remove_all_internal(self, predicate_data): request = map_remove_all_codec.encode_request(self.name, predicate_data) return self._invoke(request) def _remove_if_same_internal_(self, key_data, value_data): request = map_remove_if_same_codec.encode_request( self.name, key_data, value_data, thread_id() ) return self._invoke_on_key( request, key_data, response_handler=map_remove_if_same_codec.decode_response ) def _delete_internal(self, key_data): request = map_delete_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data) async def _put_internal(self, key_data, value_data, ttl, max_idle): def handler(message): return self._to_object(map_put_codec.decode_response(message)) if max_idle is not None: request = map_put_with_max_idle_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle) ) else: request = map_put_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl) ) return await self._invoke_on_key(request, key_data, handler) def _set_internal(self, key_data, value_data, ttl, max_idle): if max_idle is not None: request = map_set_with_max_idle_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle) ) else: request = map_set_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl) ) return self._invoke_on_key(request, key_data) def _set_ttl_internal(self, key_data, ttl): request = map_set_ttl_codec.encode_request(self.name, key_data, to_millis(ttl)) return self._invoke_on_key(request, key_data, map_set_ttl_codec.decode_response) def _try_remove_internal(self, key_data, timeout): request = map_try_remove_codec.encode_request( self.name, key_data, thread_id(), to_millis(timeout) ) return self._invoke_on_key(request, key_data, map_try_remove_codec.decode_response) def _try_put_internal(self, key_data, value_data, timeout): request = map_try_put_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(timeout) ) return self._invoke_on_key(request, key_data, map_try_put_codec.decode_response) def _put_transient_internal(self, key_data, value_data, ttl, max_idle): if max_idle is not None: request = map_put_transient_with_max_idle_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle) ) else: request = map_put_transient_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl) ) return self._invoke_on_key(request, key_data) def _put_if_absent_internal(self, key_data, value_data, ttl, max_idle): def handler(message): return self._to_object(map_put_if_absent_codec.decode_response(message)) if max_idle is not None: request = map_put_if_absent_with_max_idle_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle) ) else: request = map_put_if_absent_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl) ) return self._invoke_on_key(request, key_data, handler) def _replace_if_same_internal(self, key_data, old_value_data, new_value_data): request = map_replace_if_same_codec.encode_request( self.name, key_data, old_value_data, new_value_data, thread_id() ) return self._invoke_on_key(request, key_data, map_replace_if_same_codec.decode_response) def _replace_internal(self, key_data, value_data): def handler(message): return self._to_object(map_replace_codec.decode_response(message)) request = map_replace_codec.encode_request(self.name, key_data, value_data, thread_id()) return self._invoke_on_key(request, key_data, handler) def _evict_internal(self, key_data): request = map_evict_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data, map_evict_codec.decode_response) def _load_all_internal(self, key_data_list, replace_existing_values): request = map_load_given_keys_codec.encode_request( self.name, key_data_list, replace_existing_values ) return self._invoke(request) def _execute_on_key_internal(self, key_data, entry_processor_data): def handler(message): return self._to_object(map_execute_on_key_codec.decode_response(message)) request = map_execute_on_key_codec.encode_request( self.name, entry_processor_data, key_data, thread_id() ) return self._invoke_on_key(request, key_data, handler)
class MapFeatNearCache(Map[KeyType, ValueType]): """Map proxy implementation featuring Near Cache""" def __init__(self, service_name, name, context): super(MapFeatNearCache, self).__init__(service_name, name, context) self._invalidation_listener_id = None self._near_cache = context.near_cache_manager.get_or_create_near_cache(name) async def clear(self): self._near_cache._clear() return await super(MapFeatNearCache, self).clear() async def evict_all(self): self._near_cache.clear() return await super(MapFeatNearCache, self).evict_all() async def load_all(self, keys=None, replace_existing_values=True): if keys is None and replace_existing_values: self._near_cache.clear() return await super(MapFeatNearCache, self).load_all(keys, replace_existing_values) async def _on_destroy(self): await self._remove_near_cache_invalidation_listener() self._near_cache.clear() await super(MapFeatNearCache, self)._on_destroy() async def _add_near_cache_invalidation_listener(self): codec = map_add_near_cache_invalidation_listener_codec request = codec.encode_request(self.name, EntryEventType.INVALIDATION, self._is_smart) self._invalidation_listener_id = await self._register_listener( request, lambda r: codec.decode_response(r), lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id), lambda m: codec.handle(m, self._handle_invalidation, self._handle_batch_invalidation), ) async def _remove_near_cache_invalidation_listener(self): if self._invalidation_listener_id: await self.remove_entry_listener(self._invalidation_listener_id) def _handle_invalidation(self, key, source_uuid, partition_uuid, sequence): # key is always ``Data`` # null key means near cache has to remove all entries in it. # see MapAddNearCacheEntryListenerMessageTask. if key is None: self._near_cache._clear() else: self._invalidate_cache(key) def _handle_batch_invalidation(self, keys, source_uuids, partition_uuids, sequences): # key_list is always list of ``Data`` for key_data in keys: self._invalidate_cache(key_data) def _invalidate_cache(self, key_data): self._near_cache._invalidate(key_data) def _invalidate_cache_batch(self, key_data_list): for key_data in key_data_list: self._near_cache._invalidate(key_data) # internals async def _contains_key_internal(self, key_data): try: return self._near_cache[key_data] except KeyError: return await super(MapFeatNearCache, self)._contains_key_internal(key_data) async def _get_internal(self, key_data): try: return self._near_cache[key_data] except KeyError: value = await super(MapFeatNearCache, self)._get_internal(key_data) self._near_cache.__setitem__(key_data, value) return value async def _get_all_internal(self, partition_to_keys, tasks=None): tasks = tasks or [] for key_dic in partition_to_keys.values(): for key in list(key_dic.keys()): try: key_data = key_dic[key] value = self._near_cache[key_data] future = asyncio.Future() future.set_result((key, value)) tasks.append(future) del key_dic[key] except KeyError: pass return await super(MapFeatNearCache, self)._get_all_internal(partition_to_keys, tasks) def _try_remove_internal(self, key_data, timeout): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._try_remove_internal(key_data, timeout) def _try_put_internal(self, key_data, value_data, timeout): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._try_put_internal(key_data, value_data, timeout) def _set_internal(self, key_data, value_data, ttl, max_idle): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._set_internal(key_data, value_data, ttl, max_idle) def _set_ttl_internal(self, key_data, ttl): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._set_ttl_internal(key_data, ttl) def _replace_internal(self, key_data, value_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._replace_internal(key_data, value_data) def _replace_if_same_internal(self, key_data, old_value_data, new_value_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._replace_if_same_internal( key_data, old_value_data, new_value_data ) def _remove_internal(self, key_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._remove_internal(key_data) def _remove_all_internal(self, predicate_data): self._near_cache.clear() return super(MapFeatNearCache, self)._remove_all_internal(predicate_data) def _remove_if_same_internal_(self, key_data, value_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._remove_if_same_internal_(key_data, value_data) def _put_transient_internal(self, key_data, value_data, ttl, max_idle): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._put_transient_internal( key_data, value_data, ttl, max_idle ) async def _put_internal(self, key_data, value_data, ttl, max_idle): self._invalidate_cache(key_data) return await super(MapFeatNearCache, self)._put_internal( key_data, value_data, ttl, max_idle ) def _put_if_absent_internal(self, key_data, value_data, ttl, max_idle): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._put_if_absent_internal( key_data, value_data, ttl, max_idle ) def _load_all_internal(self, key_data_list, replace_existing_values): self._invalidate_cache_batch(key_data_list) return super(MapFeatNearCache, self)._load_all_internal( key_data_list, replace_existing_values ) def _execute_on_key_internal(self, key_data, entry_processor_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._execute_on_key_internal( key_data, entry_processor_data ) def _evict_internal(self, key_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._evict_internal(key_data) def _delete_internal(self, key_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._delete_internal(key_data) async def create_map_proxy(service_name, name, context): near_cache_config = context.config.near_caches.get(name, None) if near_cache_config is None: return Map(service_name, name, context) nc = MapFeatNearCache(service_name, name, context) if nc._near_cache.invalidate_on_change: await nc._add_near_cache_invalidation_listener() return nc