Source code for hazelcast.proxy.map

import itertools
import typing

from hazelcast.aggregator import Aggregator
from hazelcast.config import IndexUtil, IndexType, IndexConfig
from hazelcast.core import SimpleEntryView
from hazelcast.future import combine_futures, ImmediateFuture, Future
from hazelcast.invocation import Invocation
from hazelcast.projection import Projection
from hazelcast.protocol import PagingPredicateHolder
from hazelcast.protocol.codec import (
    map_add_entry_listener_codec,
    map_add_entry_listener_to_key_codec,
    map_add_entry_listener_with_predicate_codec,
    map_add_entry_listener_to_key_with_predicate_codec,
    map_clear_codec,
    map_contains_key_codec,
    map_contains_value_codec,
    map_delete_codec,
    map_entry_set_codec,
    map_entries_with_predicate_codec,
    map_evict_codec,
    map_evict_all_codec,
    map_flush_codec,
    map_force_unlock_codec,
    map_get_codec,
    map_get_all_codec,
    map_get_entry_view_codec,
    map_is_empty_codec,
    map_is_locked_codec,
    map_key_set_codec,
    map_key_set_with_predicate_codec,
    map_load_all_codec,
    map_load_given_keys_codec,
    map_lock_codec,
    map_put_codec,
    map_put_all_codec,
    map_put_if_absent_codec,
    map_put_transient_codec,
    map_size_codec,
    map_remove_codec,
    map_remove_if_same_codec,
    map_remove_entry_listener_codec,
    map_replace_codec,
    map_replace_if_same_codec,
    map_set_codec,
    map_try_lock_codec,
    map_try_put_codec,
    map_try_remove_codec,
    map_unlock_codec,
    map_values_codec,
    map_values_with_predicate_codec,
    map_add_interceptor_codec,
    map_aggregate_codec,
    map_aggregate_with_predicate_codec,
    map_project_codec,
    map_project_with_predicate_codec,
    map_execute_on_all_keys_codec,
    map_execute_on_key_codec,
    map_execute_on_keys_codec,
    map_execute_with_predicate_codec,
    map_add_near_cache_invalidation_listener_codec,
    map_add_index_codec,
    map_set_ttl_codec,
    map_entries_with_paging_predicate_codec,
    map_key_set_with_paging_predicate_codec,
    map_values_with_paging_predicate_codec,
    map_put_with_max_idle_codec,
    map_put_if_absent_with_max_idle_codec,
    map_put_transient_with_max_idle_codec,
    map_set_with_max_idle_codec,
    map_remove_interceptor_codec,
    map_remove_all_codec,
)
from hazelcast.proxy.base import (
    Proxy,
    EntryEvent,
    EntryEventType,
    get_entry_listener_flags,
    MAX_SIZE,
)
from hazelcast.predicate import Predicate, _PagingPredicate
from hazelcast.serialization.data import Data
from hazelcast.types import AggregatorResultType, KeyType, ValueType, ProjectionType
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import (
    check_not_none,
    thread_id,
    to_millis,
    IterationType,
    deserialize_entry_list_in_place,
    deserialize_list_in_place,
)


EntryEventCallable = typing.Callable[[EntryEvent[KeyType, ValueType]], None]


[docs]class Map(Proxy["BlockingMap"], typing.Generic[KeyType, ValueType]): """Hazelcast Map client proxy to access the map on the cluster. Concurrent, distributed, observable and queryable map. This map can work both async(non-blocking) or sync(blocking). Blocking calls return the value of the call and block the execution until return value is calculated. However, async calls return ``Future`` and do not block execution. Result of the ``Future`` can be used whenever ready. A ``Future``'s result can be obtained with blocking the execution by calling ``future.result()``. Example: >>> my_map = client.get_map("my_map").blocking() # sync map, all operations are blocking >>> print("map.put", my_map.put("key", "value")) >>> print("map.contains_key", my_map.contains_key("key")) >>> print("map.get", my_map.get("key")) >>> print("map.size", my_map.size()) Example: >>> my_map = client.get_map("map") # async map, all operations are non-blocking >>> def put_callback(f): >>> print("map.put", f.result()) >>> my_map.put("key", "async_val").add_done_callback(put_callback) >>> >>> print("map.size", my_map.size().result()) >>> >>> def contains_key_callback(f): >>> print("map.contains_key", f.result()) >>> my_map.contains_key("key").add_done_callback(contains_key_callback) This class does not allow ``None`` to be used as a key or value. """ def __init__(self, service_name, name, context): super(Map, self).__init__(service_name, name, context) self._reference_id_generator = context.lock_reference_id_generator
[docs] def add_entry_listener( self, include_value: bool = False, key: KeyType = None, predicate: Predicate = None, added_func: EntryEventCallable = None, removed_func: EntryEventCallable = None, updated_func: EntryEventCallable = None, evicted_func: EntryEventCallable = None, evict_all_func: EntryEventCallable = None, clear_all_func: EntryEventCallable = None, merged_func: EntryEventCallable = None, expired_func: EntryEventCallable = None, loaded_func: EntryEventCallable = None, ) -> Future[str]: """Adds a continuous entry listener for this map. Listener will get notified for map events filtered with given parameters. Args: include_value: Whether received event should include the value or not. key: Key for filtering the events. predicate: Predicate for filtering the events. added_func: Function to be called when an entry is added to map. removed_func: Function to be called when an entry is removed from map. updated_func: Function to be called when an entry is updated. evicted_func: Function to be called when an entry is evicted from map. evict_all_func: Function to be called when entries are evicted from map. clear_all_func: Function to be called when entries are cleared from map. merged_func: Function to be called when WAN replicated entry is merged. expired_func: Function to be called when an entry's live time is expired. loaded_func: Function to be called when an entry is loaded from a map loader. Returns: A registration id which is used as a key to remove the listener. """ flags = get_entry_listener_flags( ADDED=added_func, REMOVED=removed_func, UPDATED=updated_func, EVICTED=evicted_func, EXPIRED=expired_func, EVICT_ALL=evict_all_func, CLEAR_ALL=clear_all_func, MERGED=merged_func, LOADED=loaded_func, ) if key is not None and predicate is not None: try: key_data = self._to_data(key) predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return self._send_schema_and_retry( e, self.add_entry_listener, include_value, key, predicate, added_func, removed_func, updated_func, evicted_func, evict_all_func, clear_all_func, merged_func, expired_func, loaded_func, ) with_key_and_predicate_codec = map_add_entry_listener_to_key_with_predicate_codec request = with_key_and_predicate_codec.encode_request( self.name, key_data, predicate_data, include_value, flags, self._is_smart ) response_decoder = with_key_and_predicate_codec.decode_response event_message_handler = with_key_and_predicate_codec.handle elif key is not None and predicate is None: try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry( e, self.add_entry_listener, include_value, key, predicate, added_func, removed_func, updated_func, evicted_func, evict_all_func, clear_all_func, merged_func, expired_func, loaded_func, ) with_key_codec = map_add_entry_listener_to_key_codec request = with_key_codec.encode_request( self.name, key_data, include_value, flags, self._is_smart ) response_decoder = with_key_codec.decode_response event_message_handler = with_key_codec.handle elif key is None and predicate is not None: try: predicate = self._to_data(predicate) except SchemaNotReplicatedError as e: return self._send_schema_and_retry( e, self.add_entry_listener, include_value, key, predicate, added_func, removed_func, updated_func, evicted_func, evict_all_func, clear_all_func, merged_func, expired_func, loaded_func, ) with_predicate_codec = map_add_entry_listener_with_predicate_codec request = with_predicate_codec.encode_request( self.name, predicate, include_value, flags, self._is_smart ) response_decoder = with_predicate_codec.decode_response event_message_handler = with_predicate_codec.handle else: codec = map_add_entry_listener_codec request = codec.encode_request(self.name, include_value, flags, self._is_smart) response_decoder = codec.decode_response event_message_handler = codec.handle def handle_event_entry( key_data, value_data, old_value_data, merging_value_data, event_type, uuid, number_of_affected_entries, ): event = EntryEvent( self._to_object(key_data), self._to_object(value_data), self._to_object(old_value_data), self._to_object(merging_value_data), event_type, uuid, number_of_affected_entries, ) if event.event_type == EntryEventType.ADDED: added_func(event) elif event.event_type == EntryEventType.REMOVED: removed_func(event) elif event.event_type == EntryEventType.UPDATED: updated_func(event) elif event.event_type == EntryEventType.EVICTED: evicted_func(event) elif event.event_type == EntryEventType.EVICT_ALL: evict_all_func(event) elif event.event_type == EntryEventType.CLEAR_ALL: clear_all_func(event) elif event.event_type == EntryEventType.MERGED: merged_func(event) elif event.event_type == EntryEventType.EXPIRED: expired_func(event) elif event.event_type == EntryEventType.LOADED: loaded_func(event) return self._register_listener( request, lambda r: response_decoder(r), lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id), lambda m: event_message_handler(m, handle_event_entry), )
[docs] def add_index( self, attributes: typing.Sequence[str] = None, index_type: typing.Union[int, str] = IndexType.SORTED, name: str = None, bitmap_index_options: typing.Dict[str, typing.Any] = None, ) -> Future[None]: """Adds an index to this map for the specified entries so that queries can run faster. Example: Let's say your map values are Employee objects. >>> class Employee(IdentifiedDataSerializable): >>> active = false >>> age = None >>> name = None >>> #other fields >>> >>> #methods If you query your values mostly based on age and active fields, you should consider indexing these. >>> employees = client.get_map("employees") >>> employees.add_index(attributes=["age"]) # Sorted index for range queries >>> employees.add_index(attributes=["active"], index_type=IndexType.HASH)) # Hash index for equality predicates Index attribute should either have a getter method or be public. You should also make sure to add the indexes before adding entries to this map. Indexing time is executed in parallel on each partition by operation threads. The Map is not blocked during this operation. The time taken in proportional to the size of the Map and the number Members. Until the index finishes being created, any searches for the attribute will use a full Map scan, thus avoiding using a partially built index and returning incorrect results. Args: attributes: List of indexed attributes. index_type: Type of the index. By default, set to ``SORTED``. name: Name of the index. bitmap_index_options: Bitmap index options. - **unique_key:** (str): The unique key attribute is used as a source of values which uniquely identify each entry being inserted into an index. Defaults to ``KEY_ATTRIBUTE_NAME``. See the :class:`hazelcast.config.QueryConstants` for possible values. - **unique_key_transformation** (int|str): The transformation is applied to every value extracted from the unique key attribue. Defaults to ``OBJECT``. See the :class:`hazelcast.config.UniqueKeyTransformation` for possible values. """ d = { "name": name, "type": index_type, "attributes": attributes, "bitmap_index_options": bitmap_index_options, } config = IndexConfig.from_dict(d) validated = IndexUtil.validate_and_normalize(self.name, config) request = map_add_index_codec.encode_request(self.name, validated) return self._invoke(request)
[docs] def add_interceptor(self, interceptor: typing.Any) -> Future[str]: """Adds an interceptor for this map. Added interceptor will intercept operations and execute user defined methods. Args: interceptor: Interceptor for the map which includes user defined methods. Returns: Id of registered interceptor. """ try: interceptor_data = self._to_data(interceptor) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.add_interceptor, interceptor) request = map_add_interceptor_codec.encode_request(self.name, interceptor_data) return self._invoke(request, map_add_interceptor_codec.decode_response)
[docs] def aggregate( self, aggregator: Aggregator[AggregatorResultType], predicate: Predicate = None ) -> Future[AggregatorResultType]: """Applies the aggregation logic on map entries and filter the result with the predicate, if given. Args: aggregator: Aggregator to aggregate the entries with. predicate: Predicate to filter the entries with. Returns: The result of the aggregation. """ check_not_none(aggregator, "aggregator can't be none") if predicate: if isinstance(predicate, _PagingPredicate): raise AssertionError("Paging predicate is not supported.") try: aggregator_data = self._to_data(aggregator) predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.aggregate, aggregator, predicate) def handler(message): return self._to_object(map_aggregate_with_predicate_codec.decode_response(message)) request = map_aggregate_with_predicate_codec.encode_request( self.name, aggregator_data, predicate_data ) else: try: aggregator_data = self._to_data(aggregator) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.aggregate, aggregator, predicate) def handler(message): return self._to_object(map_aggregate_codec.decode_response(message)) request = map_aggregate_codec.encode_request(self.name, aggregator_data) return self._invoke(request, handler)
[docs] def clear(self) -> Future[None]: """Clears the map. The ``MAP_CLEARED`` event is fired for any registered listeners. """ request = map_clear_codec.encode_request(self.name) return self._invoke(request)
[docs] def contains_key(self, key: KeyType) -> Future[bool]: """Determines whether this map contains an entry with the key. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The specified key. Returns: ``True`` if this map contains an entry for the specified key, ``False`` otherwise. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.contains_key, key) return self._contains_key_internal(key_data)
[docs] def contains_value(self, value: ValueType) -> Future[bool]: """Determines whether this map contains one or more keys for the specified value. Args: value: The specified value. Returns: ``True`` if this map contains an entry for the specified value, ``False`` otherwise. """ check_not_none(value, "value can't be None") try: value_data = self._to_data(value) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.contains_value, value) request = map_contains_value_codec.encode_request(self.name, value_data) return self._invoke(request, map_contains_value_codec.decode_response)
[docs] def delete(self, key: KeyType) -> Future[None]: """Removes the mapping for a key from this map if it is present (optional operation). Unlike remove(object), this operation does not return the removed value, which avoids the serialization cost of the returned value. If the removed value will not be used, a delete operation is preferred over a remove operation for better performance. The map will not contain a mapping for the specified key once the call returns. Warning: This method breaks the contract of EntryListener. When an entry is removed by delete(), it fires an ``EntryEvent`` with a ``None`` ``old_value``. Also, a listener with predicates will have ``None`` values, so only the keys can be queried via predicates. Args: key: Key of the mapping to be deleted. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.delete, key) return self._delete_internal(key_data)
[docs] def entry_set( self, predicate: Predicate = None ) -> Future[typing.List[typing.Tuple[KeyType, ValueType]]]: """Returns a list clone of the mappings contained in this map. Warning: The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa. Args: predicate: Predicate for the map to filter entries. Returns: The list of key-value tuples in the map. """ if predicate: if isinstance(predicate, _PagingPredicate): predicate.iteration_type = IterationType.ENTRY try: holder = PagingPredicateHolder.of(predicate, self._to_data) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.entry_set, predicate) def handler(message): response = map_entries_with_paging_predicate_codec.decode_response(message) predicate.anchor_list = response["anchor_data_list"].as_anchor_list( self._to_object ) entry_data_list = response["response"] return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_entries_with_paging_predicate_codec.encode_request(self.name, holder) else: try: predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.entry_set, predicate) def handler(message): entry_data_list = map_entries_with_predicate_codec.decode_response(message) return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_entries_with_predicate_codec.encode_request(self.name, predicate_data) else: def handler(message): entry_data_list = map_entry_set_codec.decode_response(message) return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_entry_set_codec.encode_request(self.name) return self._invoke(request, handler)
[docs] def evict(self, key: KeyType) -> Future[bool]: """Evicts the specified key from this map. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: Key to evict. Returns: ``True`` if the key is evicted, ``False`` otherwise. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.evict, key) return self._evict_internal(key_data)
[docs] def evict_all(self) -> Future[None]: """Evicts all keys from this map except the locked ones. The ``EVICT_ALL`` event is fired for any registered listeners. """ request = map_evict_all_codec.encode_request(self.name) return self._invoke(request)
[docs] def execute_on_entries( self, entry_processor: typing.Any, predicate: Predicate = None ) -> Future[typing.List[typing.Any]]: """Applies the user defined EntryProcessor to all the entries in the map or entries in the map which satisfies the predicate if provided. Returns the results mapped by each key in the map. Args: entry_processor: A stateful serializable object which represents the EntryProcessor defined on server side. This object must have a serializable EntryProcessor counter part registered on server side with the actual ``com.hazelcast.map.EntryProcessor`` implementation. predicate: Predicate for filtering the entries. Returns: List of map entries which includes the keys and the results of the entry process. """ if predicate: try: entry_processor_data = self._to_data(entry_processor) predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return self._send_schema_and_retry( e, self.execute_on_entries, entry_processor, predicate ) def handler(message): entry_data_list = map_execute_with_predicate_codec.decode_response(message) return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_execute_with_predicate_codec.encode_request( self.name, entry_processor_data, predicate_data ) else: try: entry_processor_data = self._to_data(entry_processor) except SchemaNotReplicatedError as e: return self._send_schema_and_retry( e, self.execute_on_entries, entry_processor, predicate ) def handler(message): entry_data_list = map_execute_on_all_keys_codec.decode_response(message) return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_execute_on_all_keys_codec.encode_request(self.name, entry_processor_data) return self._invoke(request, handler)
[docs] def execute_on_key(self, key: KeyType, entry_processor: typing.Any) -> Future[typing.Any]: """Applies the user defined EntryProcessor to the entry mapped by the key. Returns the object which is the result of EntryProcessor's process method. Args: key: Specified key for the entry to be processed. entry_processor: A stateful serializable object which represents the EntryProcessor defined on server side. This object must have a serializable EntryProcessor counter part registered on server side with the actual ``com.hazelcast.map.EntryProcessor`` implementation. Returns: Result of entry process. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) entry_processor_data = self._to_data(entry_processor) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.execute_on_key, key, entry_processor) return self._execute_on_key_internal(key_data, entry_processor_data)
[docs] def execute_on_keys( self, keys: typing.Sequence[KeyType], entry_processor: typing.Any ) -> Future[typing.List[typing.Any]]: """Applies the user defined EntryProcessor to the entries mapped by the collection of keys. Returns the results mapped by each key in the collection. Args: keys: Collection of the keys for the entries to be processed. entry_processor: A stateful serializable object which represents the EntryProcessor defined on server side. This object must have a serializable EntryProcessor counter part registered on server side with the actual ``com.hazelcast.map.EntryProcessor`` implementation. Returns: List of map entries which includes the keys and the results of the entry process. """ if len(keys) == 0: return ImmediateFuture([]) try: key_list = [] for key in keys: check_not_none(key, "key can't be None") key_list.append(self._to_data(key)) entry_processor_data = self._to_data(entry_processor) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.execute_on_keys, keys, entry_processor) def handler(message): entry_data_list = map_execute_on_keys_codec.decode_response(message) return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_execute_on_keys_codec.encode_request( self.name, entry_processor_data, key_list ) return self._invoke(request, handler)
[docs] def flush(self) -> Future[None]: """Flushes all the local dirty entries.""" request = map_flush_codec.encode_request(self.name) return self._invoke(request)
[docs] def force_unlock(self, key: KeyType) -> Future[None]: """Releases the lock for the specified key regardless of the lock owner. It always successfully unlocks the key, never blocks, and returns immediately. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The key to lock. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.force_unlock, key) request = map_force_unlock_codec.encode_request( self.name, key_data, self._reference_id_generator.get_and_increment() ) return self._invoke_on_key(request, key_data)
[docs] def get(self, key: KeyType) -> Future[typing.Optional[ValueType]]: """Returns the value for the specified key, or ``None`` if this map does not contain this key. Warning: This method returns a clone of original value, modifying the returned value does not change the actual value in the map. One should put modified value back to make changes visible to all nodes. >>> value = my_map.get(key) >>> value.update_some_property() >>> my_map.put(key,value) Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The specified key. Returns: The value for the specified key. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.get, key) return self._get_internal(key_data)
[docs] def get_all(self, keys: typing.Sequence[KeyType]) -> Future[typing.Dict[KeyType, ValueType]]: """Returns the entries for the given keys. Warning: The returned map is NOT backed by the original map, so changes to the original map are NOT reflected in the returned map, and vice-versa. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: keys: Keys to get. Returns: Dictionary of map entries. """ check_not_none(keys, "keys can't be None") if not keys: return ImmediateFuture({}) partition_service = self._context.partition_service partition_to_keys: typing.Dict[int, typing.Dict[KeyType, Data]] = {} for key in keys: check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.get_all, keys) partition_id = partition_service.get_partition_id(key_data) try: partition_to_keys[partition_id][key] = key_data except KeyError: partition_to_keys[partition_id] = {key: key_data} return self._get_all_internal(partition_to_keys)
[docs] def get_entry_view(self, key: KeyType) -> Future[SimpleEntryView[KeyType, ValueType]]: """Returns the EntryView for the specified key. Warning: This method returns a clone of original mapping, modifying the returned value does not change the actual value in the map. One should put modified value back to make changes visible to all nodes. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The key of the entry. Returns: EntryView of the specified key. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.get_entry_view, key) def handler(message): response = map_get_entry_view_codec.decode_response(message) entry_view = response["response"] if not entry_view: return None entry_view.key = self._to_object(entry_view.key) entry_view.value = self._to_object(entry_view.value) return entry_view request = map_get_entry_view_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data, handler)
[docs] def is_empty(self) -> Future[bool]: """Returns whether this map contains no key-value mappings or not. Returns: ``True`` if this map contains no key-value mappings, ``False`` otherwise. """ request = map_is_empty_codec.encode_request(self.name) return self._invoke(request, map_is_empty_codec.decode_response)
[docs] def is_locked(self, key: KeyType) -> Future[bool]: """Checks the lock for the specified key. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The key that is checked for lock Returns: ``True`` if lock is acquired, ``False`` otherwise. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.is_locked, key) request = map_is_locked_codec.encode_request(self.name, key_data) return self._invoke_on_key(request, key_data, map_is_locked_codec.decode_response)
[docs] def key_set(self, predicate: Predicate = None) -> Future[typing.List[ValueType]]: """Returns a List clone of the keys contained in this map or the keys of the entries filtered with the predicate if provided. Warning: The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa. Args: predicate: Predicate to filter the entries. Returns: A list of the clone of the keys. """ if predicate: if isinstance(predicate, _PagingPredicate): predicate.iteration_type = IterationType.KEY try: holder = PagingPredicateHolder.of(predicate, self._to_data) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.key_set, predicate) def handler(message): response = map_key_set_with_paging_predicate_codec.decode_response(message) predicate.anchor_list = response["anchor_data_list"].as_anchor_list( self._to_object ) data_list = response["response"] return deserialize_list_in_place(data_list, self._to_object) request = map_key_set_with_paging_predicate_codec.encode_request(self.name, holder) else: try: predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.key_set, predicate) def handler(message): data_list = map_key_set_with_predicate_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) request = map_key_set_with_predicate_codec.encode_request(self.name, predicate_data) else: def handler(message): data_list = map_key_set_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) request = map_key_set_codec.encode_request(self.name) return self._invoke(request, handler)
[docs] def load_all( self, keys: typing.Sequence[KeyType] = None, replace_existing_values: bool = True ) -> Future[None]: """Loads all keys from the store at server side or loads the given keys if provided. Args: keys: Keys of the entry values to load. replace_existing_values: Whether the existing values will be replaced or not with those loaded from the server side MapLoader. """ if keys: try: key_data_list = [self._to_data(key) for key in keys] except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.load_all, keys, replace_existing_values) return self._load_all_internal(key_data_list, replace_existing_values) request = map_load_all_codec.encode_request(self.name, replace_existing_values) return self._invoke(request)
[docs] def lock(self, key: KeyType, lease_time: float = None) -> Future[None]: """Acquires the lock for the specified key infinitely or for the specified lease time if provided. If the lock is not available, the current thread becomes disabled for thread scheduling purposes and lies dormant until the lock has been acquired. You get a lock whether the value is present in the map or not. Other threads (possibly on other systems) would block on their invoke of lock() until the non-existent key is unlocked. If the lock holder introduces the key to the map, the put() operation is not blocked. If a thread not holding a lock on the non-existent key tries to introduce the key while a lock exists on the non-existent key, the put() operation blocks until it is unlocked. Scope of the lock is this map only. Acquired lock is only for the key in this map. Locks are re-entrant; so, if the key is locked N times, it should be unlocked N times before another thread can acquire it. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The key to lock. lease_time: Time in seconds to wait before releasing the lock. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.lock, key, lease_time) request = map_lock_codec.encode_request( self.name, key_data, thread_id(), to_millis(lease_time), self._reference_id_generator.get_and_increment(), ) partition_id = self._context.partition_service.get_partition_id(key_data) invocation = Invocation(request, partition_id=partition_id, timeout=MAX_SIZE) self._invocation_service.invoke(invocation) return invocation.future
[docs] def project( self, projection: Projection[ProjectionType], predicate: Predicate = None ) -> Future[ProjectionType]: """Applies the projection logic on map entries and filter the result with the predicate, if given. Args: projection: Projection to project the entries with. predicate: Predicate to filter the entries with. Returns: The result of the projection. """ check_not_none(projection, "Projection can't be none") if predicate: if isinstance(predicate, _PagingPredicate): raise AssertionError("Paging predicate is not supported.") try: projection_data = self._to_data(projection) predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.project, projection, predicate) def handler(message): data_list = map_project_with_predicate_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) request = map_project_with_predicate_codec.encode_request( self.name, projection_data, predicate_data ) else: try: projection_data = self._to_data(projection) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.project, projection, predicate) def handler(message): data_list = map_project_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) request = map_project_codec.encode_request(self.name, projection_data) return self._invoke(request, handler)
[docs] def put( self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None ) -> Future[typing.Optional[ValueType]]: """Associates the specified value with the specified key in this map. If the map previously contained a mapping for the key, the old value is replaced by the specified value. If ttl is provided, entry will expire and get evicted after the ttl. Warning: This method returns a clone of the previous value, not the original (identically equal) value previously put into the map. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The specified key. value: The value to associate with the key. ttl: Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite time-to-live. max_idle: Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite max idle time. Returns: Previous value associated with key or ``None`` if there was no mapping for key. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.put, key, value, ttl, max_idle) return self._put_internal(key_data, value_data, ttl, max_idle)
[docs] def put_all(self, map: typing.Dict[KeyType, ValueType]) -> Future[None]: """Copies all the mappings from the specified map to this map. No atomicity guarantees are given. In the case of a failure, some key-value tuples may get written, while others are not. Args: map: Dictionary which includes mappings to be stored in this map. """ check_not_none(map, "map can't be None") if not map: return ImmediateFuture(None) partition_service = self._context.partition_service partition_map: typing.Dict[int, typing.List[typing.Tuple[Data, Data]]] = {} for key, value in map.items(): check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: entry = (self._to_data(key), self._to_data(value)) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.put_all, map) partition_id = partition_service.get_partition_id(entry[0]) try: partition_map[partition_id].append(entry) except KeyError: partition_map[partition_id] = [entry] futures = [] for partition_id, entry_list in partition_map.items(): request = map_put_all_codec.encode_request( self.name, entry_list, False ) # TODO trigger map loader future = self._invoke_on_partition(request, partition_id) futures.append(future) return combine_futures(futures)
[docs] def put_if_absent( self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None ) -> Future[typing.Optional[ValueType]]: """Associates the specified key with the given value if it is not already associated. If ttl is provided, entry will expire and get evicted after the ttl. This is equivalent to below, except that the action is performed atomically: >>> if not my_map.contains_key(key): >>> return my_map.put(key,value) >>> else: >>> return my_map.get(key) Warning: This method returns a clone of the previous value, not the original (identically equal) value previously put into the map. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: Key of the entry. value: Value of the entry. ttl: Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite time-to-live. max_idle: Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite max idle time. Returns: Old value of the entry. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.put_if_absent, key, value, ttl, max_idle) return self._put_if_absent_internal(key_data, value_data, ttl, max_idle)
[docs] def put_transient( self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None ) -> Future[None]: """Same as ``put``, but MapStore defined at the server side will not be called. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: Key of the entry. value: Value of the entry. ttl: Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite time-to-live. max_idle: Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite max idle time. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.put_transient, key, value, ttl, max_idle) return self._put_transient_internal(key_data, value_data, ttl, max_idle)
[docs] def remove(self, key: KeyType) -> Future[typing.Optional[ValueType]]: """Removes the mapping for a key from this map if it is present. The map will not contain a mapping for the specified key once the call returns. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: Key of the mapping to be deleted. Returns: The previous value associated with key, or ``None`` if there was no mapping for key. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.remove, key) return self._remove_internal(key_data)
[docs] def remove_all(self, predicate: Predicate) -> Future[None]: """Removes all entries which match with the supplied predicate. Args: predicate: Used to select entries to be removed from map. """ check_not_none(predicate, "predicate can't be None") try: predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.remove_all, predicate) return self._remove_all_internal(predicate_data)
[docs] def remove_if_same(self, key: KeyType, value: ValueType) -> Future[bool]: """Removes the entry for a key only if it is currently mapped to a given value. This is equivalent to below, except that the action is performed atomically: >>> if my_map.contains_key(key) and my_map.get(key) == value: >>> my_map.remove(key) >>> return True >>> else: >>> return False Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The specified key. value: Remove the key if it has this value. Returns: ``True`` if the value was removed, ``False`` otherwise. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.remove_if_same, key, value) return self._remove_if_same_internal_(key_data, value_data)
[docs] def remove_entry_listener(self, registration_id: str) -> Future[bool]: """Removes the specified entry listener. Returns silently if there is no such listener added before. Args: registration_id: Id of registered listener. Returns: ``True`` if registration is removed, ``False`` otherwise. """ return self._deregister_listener(registration_id)
[docs] def remove_interceptor(self, registration_id: str) -> Future[bool]: """Removes the given interceptor for this map, so it will not intercept operations anymore. Args: registration_id: Registration ID of the map interceptor. Returns: ``True`` if the interceptor is removed, ``False`` otherwise. """ check_not_none(registration_id, "Interceptor registration id should not be None") request = map_remove_interceptor_codec.encode_request(self.name, registration_id) return self._invoke(request, map_remove_interceptor_codec.decode_response)
[docs] def replace(self, key: KeyType, value: ValueType) -> Future[typing.Optional[ValueType]]: """Replaces the entry for a key only if it is currently mapped to some value. This is equivalent to below, except that the action is performed atomically: >>> if my_map.contains_key(key): >>> return my_map.put(key,value) >>> else: >>> return None Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Warning: This method returns a clone of the previous value, not the original (identically equal) value previously put into the map. Args: key: The specified key. value: The value to replace the previous value. Returns: Previous value associated with key, or ``None`` if there was no mapping for key. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.replace, key, value) return self._replace_internal(key_data, value_data)
[docs] def replace_if_same( self, key: ValueType, old_value: ValueType, new_value: ValueType ) -> Future[bool]: """Replaces the entry for a key only if it is currently mapped to a given value. This is equivalent to below, except that the action is performed atomically: >>> if my_map.contains_key(key) and my_map.get(key) == old_value: >>> my_map.put(key, new_value) >>> return True >>> else: >>> return False Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The specified key. old_value: Replace the key value if it is the old value. new_value: The new value to replace the old value. Returns: ``True`` if the value was replaced, ``False`` otherwise. """ check_not_none(key, "key can't be None") check_not_none(old_value, "old_value can't be None") check_not_none(new_value, "new_value can't be None") try: key_data = self._to_data(key) old_value_data = self._to_data(old_value) new_value_data = self._to_data(new_value) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.replace_if_same, key, old_value, new_value) return self._replace_if_same_internal(key_data, old_value_data, new_value_data)
[docs] def set( self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None ) -> Future[None]: """Puts an entry into this map. Similar to the put operation except that set doesn't return the old value, which is more efficient. If ttl is provided, entry will expire and get evicted after the ttl. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: Key of the entry. value: Value of the entry. ttl: Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite time-to-live. max_idle: Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to ``0`` means infinite max idle time. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.set, key, value, ttl, max_idle) return self._set_internal(key_data, value_data, ttl, max_idle)
[docs] def set_ttl(self, key: KeyType, ttl: float) -> Future[None]: """Updates the TTL (time to live) value of the entry specified by the given key with a new TTL value. New TTL value is valid starting from the time this operation is invoked, not since the time the entry was created. If the entry does not exist or is already expired, this call has no effect. Args: key: The key of the map entry. ttl: Maximum time in seconds for this entry to stay in the map. Setting this to ``0`` means infinite time-to-live. """ check_not_none(key, "key can't be None") check_not_none(ttl, "ttl can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.set_ttl, key, ttl) return self._set_ttl_internal(key_data, ttl)
[docs] def size(self) -> Future[int]: """Returns the number of entries in this map. Returns: Number of entries in this map. """ request = map_size_codec.encode_request(self.name) return self._invoke(request, map_size_codec.decode_response)
[docs] def try_lock(self, key: KeyType, lease_time: float = None, timeout: float = 0) -> Future[bool]: """Tries to acquire the lock for the specified key. When the lock is not available: - If the timeout is not provided, the current thread doesn't wait and returns ``False`` immediately. - If the timeout is provided, the current thread becomes disabled for thread scheduling purposes and lies dormant until one of the followings happens: - The lock is acquired by the current thread, or - The specified waiting time elapses. If the lease time is provided, lock will be released after this time elapses. Args: key: Key to lock in this map. lease_time: Time in seconds to wait before releasing the lock. timeout: Maximum time in seconds to wait for the lock. Returns: ``True`` if the lock was acquired, ``False`` otherwise. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.try_lock, key, lease_time, timeout) request = map_try_lock_codec.encode_request( self.name, key_data, thread_id(), to_millis(lease_time), to_millis(timeout), self._reference_id_generator.get_and_increment(), ) partition_id = self._context.partition_service.get_partition_id(key_data) invocation = Invocation( request, partition_id=partition_id, timeout=MAX_SIZE, response_handler=map_try_lock_codec.decode_response, ) self._invocation_service.invoke(invocation) return invocation.future
[docs] def try_put(self, key: KeyType, value: ValueType, timeout: float = 0) -> Future[bool]: """Tries to put the given key and value into this map and returns immediately if timeout is not provided. If timeout is provided, operation waits until it is completed or timeout is reached. Args: key: Key of the entry. value: Value of the entry. timeout: Maximum time in seconds to wait. Returns: ``True`` if the put is successful, ``False`` otherwise. """ check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") try: key_data = self._to_data(key) value_data = self._to_data(value) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.try_put, key, value, timeout) return self._try_put_internal(key_data, value_data, timeout)
[docs] def try_remove(self, key: KeyType, timeout: float = 0) -> Future[bool]: """Tries to remove the given key from this map and returns immediately if timeout is not provided. If timeout is provided, operation waits until it is completed or timeout is reached. Args: key: Key of the entry to be deleted. timeout: Maximum time in seconds to wait. Returns: ``True`` if the remove is successful, ``False`` otherwise. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.try_remove, key, timeout) return self._try_remove_internal(key_data, timeout)
[docs] def unlock(self, key: KeyType) -> Future[None]: """Releases the lock for the specified key. It never blocks and returns immediately. If the current thread is the holder of this lock, then the hold count is decremented. If the hold count is zero, then the lock is released. Args: key: The key to lock. """ check_not_none(key, "key can't be None") try: key_data = self._to_data(key) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.unlock, key) request = map_unlock_codec.encode_request( self.name, key_data, thread_id(), self._reference_id_generator.get_and_increment() ) return self._invoke_on_key(request, key_data)
[docs] def values(self, predicate: Predicate = None) -> Future[typing.List[ValueType]]: """Returns a list clone of the values contained in this map or values of the entries which are filtered with the predicate if provided. Warning: The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa. Args: predicate: Predicate to filter the entries. Returns: A list of clone of the values contained in this map. """ if predicate: if isinstance(predicate, _PagingPredicate): predicate.iteration_type = IterationType.VALUE try: holder = PagingPredicateHolder.of(predicate, self._to_data) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.values, predicate) def handler(message): response = map_values_with_paging_predicate_codec.decode_response(message) predicate.anchor_list = response["anchor_data_list"].as_anchor_list( self._to_object ) data_list = response["response"] return deserialize_list_in_place(data_list, self._to_object) request = map_values_with_paging_predicate_codec.encode_request(self.name, holder) else: try: predicate_data = self._to_data(predicate) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.values, predicate) def handler(message): data_list = map_values_with_predicate_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) request = map_values_with_predicate_codec.encode_request(self.name, predicate_data) else: def handler(message): data_list = map_values_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) request = map_values_codec.encode_request(self.name) return self._invoke(request, handler)
[docs] def blocking(self) -> "BlockingMap[KeyType, ValueType]": return BlockingMap(self)
# internals def _contains_key_internal(self, key_data): request = map_contains_key_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data, map_contains_key_codec.decode_response) def _get_internal(self, key_data): def handler(message): return self._to_object(map_get_codec.decode_response(message)) request = map_get_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data, handler) def _get_all_internal(self, partition_to_keys, futures=None): if futures is None: futures = [] def handler(message): entry_data_list = map_get_all_codec.decode_response(message) return deserialize_entry_list_in_place(entry_data_list, self._to_object) for partition_id, key_dict in partition_to_keys.items(): request = map_get_all_codec.encode_request(self.name, key_dict.values()) future = self._invoke_on_partition(request, partition_id, handler) futures.append(future) def merge(f): return dict(itertools.chain(*f.result())) return combine_futures(futures).continue_with(merge) def _remove_internal(self, key_data): def handler(message): return self._to_object(map_remove_codec.decode_response(message)) request = map_remove_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data, handler) def _remove_all_internal(self, predicate_data): request = map_remove_all_codec.encode_request(self.name, predicate_data) return self._invoke(request) def _remove_if_same_internal_(self, key_data, value_data): request = map_remove_if_same_codec.encode_request( self.name, key_data, value_data, thread_id() ) return self._invoke_on_key( request, key_data, response_handler=map_remove_if_same_codec.decode_response ) def _delete_internal(self, key_data): request = map_delete_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data) def _put_internal(self, key_data, value_data, ttl, max_idle): def handler(message): return self._to_object(map_put_codec.decode_response(message)) if max_idle is not None: request = map_put_with_max_idle_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle) ) else: request = map_put_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl) ) return self._invoke_on_key(request, key_data, handler) def _set_internal(self, key_data, value_data, ttl, max_idle): if max_idle is not None: request = map_set_with_max_idle_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle) ) else: request = map_set_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl) ) return self._invoke_on_key(request, key_data) def _set_ttl_internal(self, key_data, ttl): request = map_set_ttl_codec.encode_request(self.name, key_data, to_millis(ttl)) return self._invoke_on_key(request, key_data, map_set_ttl_codec.decode_response) def _try_remove_internal(self, key_data, timeout): request = map_try_remove_codec.encode_request( self.name, key_data, thread_id(), to_millis(timeout) ) return self._invoke_on_key(request, key_data, map_try_remove_codec.decode_response) def _try_put_internal(self, key_data, value_data, timeout): request = map_try_put_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(timeout) ) return self._invoke_on_key(request, key_data, map_try_put_codec.decode_response) def _put_transient_internal(self, key_data, value_data, ttl, max_idle): if max_idle is not None: request = map_put_transient_with_max_idle_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle) ) else: request = map_put_transient_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl) ) return self._invoke_on_key(request, key_data) def _put_if_absent_internal(self, key_data, value_data, ttl, max_idle): def handler(message): return self._to_object(map_put_if_absent_codec.decode_response(message)) if max_idle is not None: request = map_put_if_absent_with_max_idle_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle) ) else: request = map_put_if_absent_codec.encode_request( self.name, key_data, value_data, thread_id(), to_millis(ttl) ) return self._invoke_on_key(request, key_data, handler) def _replace_if_same_internal(self, key_data, old_value_data, new_value_data): request = map_replace_if_same_codec.encode_request( self.name, key_data, old_value_data, new_value_data, thread_id() ) return self._invoke_on_key(request, key_data, map_replace_if_same_codec.decode_response) def _replace_internal(self, key_data, value_data): def handler(message): return self._to_object(map_replace_codec.decode_response(message)) request = map_replace_codec.encode_request(self.name, key_data, value_data, thread_id()) return self._invoke_on_key(request, key_data, handler) def _evict_internal(self, key_data): request = map_evict_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data, map_evict_codec.decode_response) def _load_all_internal(self, key_data_list, replace_existing_values): request = map_load_given_keys_codec.encode_request( self.name, key_data_list, replace_existing_values ) return self._invoke(request) def _execute_on_key_internal(self, key_data, entry_processor_data): def handler(message): return self._to_object(map_execute_on_key_codec.decode_response(message)) request = map_execute_on_key_codec.encode_request( self.name, entry_processor_data, key_data, thread_id() ) return self._invoke_on_key(request, key_data, handler)
class MapFeatNearCache(Map[KeyType, ValueType]): """Map proxy implementation featuring Near Cache""" def __init__(self, service_name, name, context): super(MapFeatNearCache, self).__init__(service_name, name, context) self._invalidation_listener_id = None self._near_cache = context.near_cache_manager.get_or_create_near_cache(name) if self._near_cache.invalidate_on_change: self._add_near_cache_invalidation_listener() def clear(self): self._near_cache._clear() return super(MapFeatNearCache, self).clear() def evict_all(self): self._near_cache.clear() return super(MapFeatNearCache, self).evict_all() def load_all(self, keys=None, replace_existing_values=True): if keys is None and replace_existing_values: self._near_cache.clear() return super(MapFeatNearCache, self).load_all(keys, replace_existing_values) def blocking(self) -> "BlockingMap[KeyType, ValueType]": return BlockingMap(self) def _on_destroy(self): self._remove_near_cache_invalidation_listener() self._near_cache.clear() super(MapFeatNearCache, self)._on_destroy() def _add_near_cache_invalidation_listener(self): codec = map_add_near_cache_invalidation_listener_codec request = codec.encode_request(self.name, EntryEventType.INVALIDATION, self._is_smart) self._invalidation_listener_id = self._register_listener( request, lambda r: codec.decode_response(r), lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id), lambda m: codec.handle(m, self._handle_invalidation, self._handle_batch_invalidation), ).result() def _remove_near_cache_invalidation_listener(self): if self._invalidation_listener_id: self.remove_entry_listener(self._invalidation_listener_id) def _handle_invalidation(self, key, source_uuid, partition_uuid, sequence): # key is always ``Data`` # null key means near cache has to remove all entries in it. # see MapAddNearCacheEntryListenerMessageTask. if key is None: self._near_cache._clear() else: self._invalidate_cache(key) def _handle_batch_invalidation(self, keys, source_uuids, partition_uuids, sequences): # key_list is always list of ``Data`` for key_data in keys: self._invalidate_cache(key_data) def _invalidate_cache(self, key_data): self._near_cache._invalidate(key_data) def _invalidate_cache_batch(self, key_data_list): for key_data in key_data_list: self._near_cache._invalidate(key_data) # internals def _contains_key_internal(self, key_data): try: return self._near_cache[key_data] except KeyError: return super(MapFeatNearCache, self)._contains_key_internal(key_data) def _get_internal(self, key_data): try: value = self._near_cache[key_data] return ImmediateFuture(value) except KeyError: future = super(MapFeatNearCache, self)._get_internal(key_data) return future.continue_with(self._update_cache, key_data) def _update_cache(self, f, key_data): self._near_cache.__setitem__(key_data, f.result()) return f.result() def _get_all_internal(self, partition_to_keys, futures=None): if futures is None: futures = [] for key_dic in partition_to_keys.values(): for key in list(key_dic.keys()): try: key_data = key_dic[key] value = self._near_cache[key_data] future = ImmediateFuture((key, value)) futures.append(future) del key_dic[key] except KeyError: pass return super(MapFeatNearCache, self)._get_all_internal(partition_to_keys, futures) def _try_remove_internal(self, key_data, timeout): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._try_remove_internal(key_data, timeout) def _try_put_internal(self, key_data, value_data, timeout): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._try_put_internal(key_data, value_data, timeout) def _set_internal(self, key_data, value_data, ttl, max_idle): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._set_internal(key_data, value_data, ttl, max_idle) def _set_ttl_internal(self, key_data, ttl): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._set_ttl_internal(key_data, ttl) def _replace_internal(self, key_data, value_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._replace_internal(key_data, value_data) def _replace_if_same_internal(self, key_data, old_value_data, new_value_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._replace_if_same_internal( key_data, old_value_data, new_value_data ) def _remove_internal(self, key_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._remove_internal(key_data) def _remove_all_internal(self, predicate_data): self._near_cache.clear() return super(MapFeatNearCache, self)._remove_all_internal(predicate_data) def _remove_if_same_internal_(self, key_data, value_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._remove_if_same_internal_(key_data, value_data) def _put_transient_internal(self, key_data, value_data, ttl, max_idle): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._put_transient_internal( key_data, value_data, ttl, max_idle ) def _put_internal(self, key_data, value_data, ttl, max_idle): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._put_internal(key_data, value_data, ttl, max_idle) def _put_if_absent_internal(self, key_data, value_data, ttl, max_idle): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._put_if_absent_internal( key_data, value_data, ttl, max_idle ) def _load_all_internal(self, key_data_list, replace_existing_values): self._invalidate_cache_batch(key_data_list) return super(MapFeatNearCache, self)._load_all_internal( key_data_list, replace_existing_values ) def _execute_on_key_internal(self, key_data, entry_processor_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._execute_on_key_internal( key_data, entry_processor_data ) def _evict_internal(self, key_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._evict_internal(key_data) def _delete_internal(self, key_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._delete_internal(key_data) class BlockingMap(Map[KeyType, ValueType]): __slots__ = ("_wrapped", "name", "service_name") def __init__( self, wrapped: typing.Union[Map[KeyType, ValueType], MapFeatNearCache[KeyType, ValueType]] ): self.name = wrapped.name self.service_name = wrapped.service_name self._wrapped = wrapped def add_entry_listener( # type: ignore[override] self, include_value: bool = False, key: KeyType = None, predicate: Predicate = None, added_func: EntryEventCallable = None, removed_func: EntryEventCallable = None, updated_func: EntryEventCallable = None, evicted_func: EntryEventCallable = None, evict_all_func: EntryEventCallable = None, clear_all_func: EntryEventCallable = None, merged_func: EntryEventCallable = None, expired_func: EntryEventCallable = None, loaded_func: EntryEventCallable = None, ) -> str: return self._wrapped.add_entry_listener( include_value, key, predicate, added_func, removed_func, updated_func, evicted_func, evict_all_func, clear_all_func, merged_func, expired_func, loaded_func, ).result() def add_index( # type: ignore[override] self, attributes: typing.Sequence[str] = None, index_type: typing.Union[int, str] = IndexType.SORTED, name: str = None, bitmap_index_options: typing.Dict[str, typing.Any] = None, ) -> None: return self._wrapped.add_index(attributes, index_type, name, bitmap_index_options).result() def add_interceptor( # type: ignore[override] self, interceptor: typing.Any, ) -> str: return self._wrapped.add_interceptor(interceptor).result() def aggregate( # type: ignore[override] self, aggregator: Aggregator[AggregatorResultType], predicate: Predicate = None, ) -> AggregatorResultType: return self._wrapped.aggregate(aggregator, predicate).result() def clear( # type: ignore[override] self, ) -> None: return self._wrapped.clear().result() def contains_key( # type: ignore[override] self, key: KeyType, ) -> bool: return self._wrapped.contains_key(key).result() def contains_value( # type: ignore[override] self, value: ValueType, ) -> bool: return self._wrapped.contains_value(value).result() def delete( # type: ignore[override] self, key: KeyType, ) -> None: return self._wrapped.delete(key).result() def entry_set( # type: ignore[override] self, predicate: Predicate = None, ) -> typing.List[typing.Tuple[KeyType, ValueType]]: return self._wrapped.entry_set(predicate).result() def evict( # type: ignore[override] self, key: KeyType, ) -> bool: return self._wrapped.evict(key).result() def evict_all( # type: ignore[override] self, ) -> None: return self._wrapped.evict_all().result() def execute_on_entries( # type: ignore[override] self, entry_processor: typing.Any, predicate: Predicate = None, ) -> typing.List[typing.Any]: return self._wrapped.execute_on_entries(entry_processor, predicate).result() def execute_on_key( # type: ignore[override] self, key: KeyType, entry_processor: typing.Any, ) -> typing.Any: return self._wrapped.execute_on_key(key, entry_processor).result() def execute_on_keys( # type: ignore[override] self, keys: typing.Sequence[KeyType], entry_processor: typing.Any, ) -> typing.List[typing.Any]: return self._wrapped.execute_on_keys(keys, entry_processor).result() def flush( # type: ignore[override] self, ) -> None: return self._wrapped.flush().result() def force_unlock( # type: ignore[override] self, key: KeyType, ) -> None: return self._wrapped.force_unlock(key).result() def get( # type: ignore[override] self, key: KeyType, ) -> typing.Optional[ValueType]: return self._wrapped.get(key).result() def get_all( # type: ignore[override] self, keys: typing.Sequence[KeyType], ) -> typing.Dict[KeyType, ValueType]: return self._wrapped.get_all(keys).result() def get_entry_view( # type: ignore[override] self, key: KeyType, ) -> SimpleEntryView[KeyType, ValueType]: return self._wrapped.get_entry_view(key).result() def is_empty( # type: ignore[override] self, ) -> bool: return self._wrapped.is_empty().result() def is_locked( # type: ignore[override] self, key: KeyType, ) -> bool: return self._wrapped.is_locked(key).result() def key_set( # type: ignore[override] self, predicate: Predicate = None, ) -> typing.List[ValueType]: return self._wrapped.key_set(predicate).result() def load_all( # type: ignore[override] self, keys: typing.Sequence[KeyType] = None, replace_existing_values: bool = True, ) -> None: return self._wrapped.load_all(keys, replace_existing_values).result() def lock( # type: ignore[override] self, key: KeyType, lease_time: float = None, ) -> None: return self._wrapped.lock(key, lease_time).result() def project( # type: ignore[override] self, projection: Projection[ProjectionType], predicate: Predicate = None, ) -> ProjectionType: return self._wrapped.project(projection, predicate).result() def put( # type: ignore[override] self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None, ) -> typing.Optional[ValueType]: return self._wrapped.put(key, value, ttl, max_idle).result() def put_all( # type: ignore[override] self, map: typing.Dict[KeyType, ValueType], ) -> None: return self._wrapped.put_all(map).result() def put_if_absent( # type: ignore[override] self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None, ) -> typing.Optional[ValueType]: return self._wrapped.put_if_absent(key, value, ttl, max_idle).result() def put_transient( # type: ignore[override] self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None, ) -> None: return self._wrapped.put_transient(key, value, ttl, max_idle).result() def remove( # type: ignore[override] self, key: KeyType, ) -> typing.Optional[ValueType]: return self._wrapped.remove(key).result() def remove_all(self, predicate: Predicate) -> None: # type: ignore[override] return self._wrapped.remove_all(predicate).result() def remove_if_same( # type: ignore[override] self, key: KeyType, value: ValueType, ) -> bool: return self._wrapped.remove_if_same(key, value).result() def remove_entry_listener( # type: ignore[override] self, registration_id: str, ) -> bool: return self._wrapped.remove_entry_listener(registration_id).result() def remove_interceptor( # type: ignore[override] self, registration_id: str, ) -> bool: return self._wrapped.remove_interceptor(registration_id).result() def replace( # type: ignore[override] self, key: KeyType, value: ValueType, ) -> typing.Optional[ValueType]: return self._wrapped.replace(key, value).result() def replace_if_same( # type: ignore[override] self, key: ValueType, old_value: ValueType, new_value: ValueType, ) -> bool: return self._wrapped.replace_if_same(key, old_value, new_value).result() def set( # type: ignore[override] self, key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None, ) -> None: return self._wrapped.set(key, value, ttl, max_idle).result() def set_ttl( # type: ignore[override] self, key: KeyType, ttl: float, ) -> None: return self._wrapped.set_ttl(key, ttl).result() def size( # type: ignore[override] self, ) -> int: return self._wrapped.size().result() def try_lock( # type: ignore[override] self, key: KeyType, lease_time: float = None, timeout: float = 0, ) -> bool: return self._wrapped.try_lock(key, lease_time, timeout).result() def try_put( # type: ignore[override] self, key: KeyType, value: ValueType, timeout: float = 0, ) -> bool: return self._wrapped.try_put(key, value, timeout).result() def try_remove( # type: ignore[override] self, key: KeyType, timeout: float = 0, ) -> bool: return self._wrapped.try_remove(key, timeout).result() def unlock( # type: ignore[override] self, key: KeyType, ) -> None: return self._wrapped.unlock(key).result() def values( # type: ignore[override] self, predicate: Predicate = None, ) -> typing.List[ValueType]: return self._wrapped.values(predicate).result() def blocking(self) -> "BlockingMap[KeyType, ValueType]": return self def destroy(self) -> bool: return self._wrapped.destroy() def __repr__(self) -> str: return self._wrapped.__repr__() def create_map_proxy(service_name, name, context): near_cache_config = context.config.near_caches.get(name, None) if near_cache_config is None: return Map(service_name, name, context) else: return MapFeatNearCache(service_name, name, context)