Source code for hazelcast.proxy.replicated_map

from random import randint
from hazelcast.protocol.codec import (
    replicated_map_clear_codec,
    replicated_map_add_entry_listener_codec,
    replicated_map_add_entry_listener_to_key_codec,
    replicated_map_add_entry_listener_to_key_with_predicate_codec,
    replicated_map_add_entry_listener_with_predicate_codec,
    replicated_map_contains_key_codec,
    replicated_map_contains_value_codec,
    replicated_map_entry_set_codec,
    replicated_map_get_codec,
    replicated_map_is_empty_codec,
    replicated_map_key_set_codec,
    replicated_map_put_all_codec,
    replicated_map_put_codec,
    replicated_map_remove_codec,
    replicated_map_remove_entry_listener_codec,
    replicated_map_size_codec,
    replicated_map_values_codec,
)
from hazelcast.proxy.base import Proxy, EntryEvent, EntryEventType
from hazelcast.util import to_millis, check_not_none, ImmutableLazyDataList
from hazelcast import six


[docs]class ReplicatedMap(Proxy): """A ReplicatedMap is a map-like data structure with weak consistency and values locally stored on every node of the cluster. Whenever a value is written asynchronously, the new value will be internally distributed to all existing cluster members, and eventually every node will have the new value. When a new node joins the cluster, the new node initially will request existing values from older nodes and replicate them locally. """ def __init__(self, service_name, name, context): super(ReplicatedMap, self).__init__(service_name, name, context) partition_service = context.partition_service self._partition_id = randint(0, partition_service.partition_count - 1)
[docs] def add_entry_listener( self, key=None, predicate=None, added_func=None, removed_func=None, updated_func=None, evicted_func=None, clear_all_func=None, ): """Adds a continuous entry listener for this map. Listener will get notified for map events filtered with given parameters. Args: key: Key for filtering the events. predicate (hazelcast.predicate.Predicate): Predicate for filtering the events. added_func (function): Function to be called when an entry is added to map. removed_func (function): Function to be called when an entry is removed from map. updated_func (function): Function to be called when an entry is updated. evicted_func (function): Function to be called when an entry is evicted from map. clear_all_func (function): Function to be called when entries are cleared from map. Returns: hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener. """ if key and predicate: codec = replicated_map_add_entry_listener_to_key_with_predicate_codec key_data = self._to_data(key) predicate_data = self._to_data(predicate) request = codec.encode_request(self.name, key_data, predicate_data, self._is_smart) elif key and not predicate: codec = replicated_map_add_entry_listener_to_key_codec key_data = self._to_data(key) request = codec.encode_request(self.name, key_data, self._is_smart) elif not key and predicate: codec = replicated_map_add_entry_listener_with_predicate_codec predicate = self._to_data(predicate) request = codec.encode_request(self.name, predicate, self._is_smart) else: codec = replicated_map_add_entry_listener_codec request = codec.encode_request(self.name, self._is_smart) def handle_event_entry( key, value, old_value, merging_value, event_type, uuid, number_of_affected_entries ): event = EntryEvent( self._to_object, key, value, old_value, merging_value, event_type, uuid, number_of_affected_entries, ) if event.event_type == EntryEventType.ADDED and added_func: added_func(event) elif event.event_type == EntryEventType.REMOVED and removed_func: removed_func(event) elif event.event_type == EntryEventType.UPDATED and updated_func: updated_func(event) elif event.event_type == EntryEventType.EVICTED and evicted_func: evicted_func(event) elif event.event_type == EntryEventType.CLEAR_ALL and clear_all_func: clear_all_func(event) return self._register_listener( request, lambda r: codec.decode_response(r), lambda reg_id: replicated_map_remove_entry_listener_codec.encode_request( self.name, reg_id ), lambda m: codec.handle(m, handle_event_entry), )
[docs] def clear(self): """Wipes data out of the replicated map. Returns: hazelcast.future.Future[None]: """ request = replicated_map_clear_codec.encode_request(self.name) return self._invoke(request)
[docs] def contains_key(self, key): """Determines whether this map contains an entry with the key. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The specified key. Returns: hazelcast.future.Future[bool]: ``True`` if this map contains an entry for the specified key, ``False`` otherwise. """ check_not_none(key, "key can't be None") key_data = self._to_data(key) request = replicated_map_contains_key_codec.encode_request(self.name, key_data) return self._invoke_on_key( request, key_data, replicated_map_contains_key_codec.decode_response )
[docs] def contains_value(self, value): """Determines whether this map contains one or more keys for the specified value. Args: value: The specified value. Returns: hazelcast.future.Future[bool]: ``True`` if this map contains an entry for the specified value, ``False`` otherwise. """ check_not_none(value, "value can't be None") value_data = self._to_data(value) request = replicated_map_contains_value_codec.encode_request(self.name, value_data) return self._invoke_on_partition( request, self._partition_id, replicated_map_contains_value_codec.decode_response )
[docs] def entry_set(self): """Returns a List clone of the mappings contained in this map. Warning: The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa. Returns: hazelcast.future.Future[list[tuple]]: The list of key-value tuples in the map. """ def handler(message): return ImmutableLazyDataList( replicated_map_entry_set_codec.decode_response(message), self._to_object ) request = replicated_map_entry_set_codec.encode_request(self.name) return self._invoke_on_partition(request, self._partition_id, handler)
[docs] def get(self, key): """Returns the value for the specified key, or ``None`` if this map does not contain this key. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: The specified key. Returns: hazelcast.future.Future[any]: The value associated with the specified key. """ check_not_none(key, "key can't be None") def handler(message): return self._to_object(replicated_map_get_codec.decode_response(message)) key_data = self._to_data(key) request = replicated_map_get_codec.encode_request(self.name, key_data) return self._invoke_on_key(request, key_data, handler)
[docs] def is_empty(self): """Returns ``True`` if this map contains no key-value mappings. Returns: hazelcast.future.Future[bool]: ``True`` if this map contains no key-value mappings. """ request = replicated_map_is_empty_codec.encode_request(self.name) return self._invoke_on_partition( request, self._partition_id, replicated_map_is_empty_codec.decode_response )
[docs] def key_set(self): """Returns the list of keys in the ReplicatedMap. Warning: The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa. Returns: hazelcast.future.Future[list]: A list of the clone of the keys. """ def handler(message): return ImmutableLazyDataList( replicated_map_key_set_codec.decode_response(message), self._to_object ) request = replicated_map_key_set_codec.encode_request(self.name) return self._invoke_on_partition(request, self._partition_id, handler)
[docs] def put(self, key, value, ttl=0): """Associates the specified value with the specified key in this map. If the map previously contained a mapping for the key, the old value is replaced by the specified value. If ttl is provided, entry will expire and get evicted after the ttl. Args: key: The specified key. value: The value to associate with the key. ttl (int): Maximum time in seconds for this entry to stay, if not provided, the value configured on server side configuration will be used. Returns: hazelcast.future.Future[any]: Previous value associated with key or ``None`` if there was no mapping for key. """ check_not_none(key, "key can't be None") check_not_none(key, "value can't be None") def handler(message): return self._to_object(replicated_map_put_codec.decode_response(message)) key_data = self._to_data(key) value_data = self._to_data(value) request = replicated_map_put_codec.encode_request( self.name, key_data, value_data, to_millis(ttl) ) return self._invoke_on_key(request, key_data, handler)
[docs] def put_all(self, source): """Copies all of the mappings from the specified map to this map. No atomicity guarantees are given. In the case of a failure, some of the key-value tuples may get written, while others are not. Args: source (dict): Map which includes mappings to be stored in this map. Returns: hazelcast.future.Future[None]: """ entries = [] for key, value in six.iteritems(source): check_not_none(key, "key can't be None") check_not_none(value, "value can't be None") entries.append((self._to_data(key), self._to_data(value))) request = replicated_map_put_all_codec.encode_request(self.name, entries) return self._invoke(request)
[docs] def remove(self, key): """Removes the mapping for a key from this map if it is present. The map will not contain a mapping for the specified key once the call returns. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form of the key, not the actual implementations of ``__hash__`` and ``__eq__`` defined in key's class. Args: key: Key of the mapping to be deleted. Returns: hazelcast.future.Future[any]: The previous value associated with key, or ``None`` if there was no mapping for key. """ check_not_none(key, "key can't be None") def handler(message): return self._to_object(replicated_map_remove_codec.decode_response(message)) key_data = self._to_data(key) request = replicated_map_remove_codec.encode_request(self.name, key_data) return self._invoke_on_key(request, key_data, handler)
[docs] def remove_entry_listener(self, registration_id): """Removes the specified entry listener. Returns silently if there is no such listener added before. Args: registration_id (str): Id of registered listener. Returns: hazelcast.future.Future[bool]: ``True`` if registration is removed, ``False`` otherwise. """ return self._deregister_listener(registration_id)
[docs] def size(self): """Returns the number of entries in this multimap. Returns: hazelcast.future.Future[int]: Number of entries in this multimap. """ request = replicated_map_size_codec.encode_request(self.name) return self._invoke_on_partition( request, self._partition_id, replicated_map_size_codec.decode_response )
[docs] def values(self): """Returns the list of values in the map. Warning: The returned list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa. Returns: hazelcast.future.Future[list]: The list of values in the map. """ def handler(message): return ImmutableLazyDataList( replicated_map_values_codec.decode_response(message), self._to_object ) request = replicated_map_values_codec.encode_request(self.name) return self._invoke_on_partition(request, self._partition_id, handler)