import functools
import logging
import random
from hazelcast.future import Future
from hazelcast.proxy.base import Proxy
from hazelcast.cluster import VectorClock
from hazelcast.protocol.codec import (
pn_counter_add_codec,
pn_counter_get_codec,
pn_counter_get_configured_replica_count_codec,
)
from hazelcast.errors import NoDataMemberInClusterError
from hazelcast.six.moves import range
_logger = logging.getLogger(__name__)
[docs]class PNCounter(Proxy):
"""PN (Positive-Negative) CRDT counter.
The counter supports adding and subtracting values as well as
retrieving the current counter value.
Each replica of this counter can perform operations locally without
coordination with the other replicas, thus increasing availability.
The counter guarantees that whenever two nodes have received the
same set of updates, possibly in a different order, their state is
identical, and any conflicting updates are merged automatically.
If no new updates are made to the shared state, all nodes that can
communicate will eventually have the same data.
When invoking updates from the client, the invocation is remote.
This may lead to indeterminate state - the update may be applied but the
response has not been received. In this case, the caller will be notified
with a TargetDisconnectedError.
The read and write methods provide monotonic read and RYW (read-your-write)
guarantees. These guarantees are session guarantees which means that if
no replica with the previously observed state is reachable, the session
guarantees are lost and the method invocation will throw a
ConsistencyLostError. This does not mean
that an update is lost. All of the updates are part of some replica and
will be eventually reflected in the state of all other replicas. This
exception just means that you cannot observe your own writes because
all replicas that contain your updates are currently unreachable.
After you have received a ConsistencyLostError, you can either
wait for a sufficiently up-to-date replica to become reachable in which
case the session can be continued or you can reset the session by calling
the reset() method. If you have called the reset() method,
a new session is started with the next invocation to a CRDT replica.
Notes:
The CRDT state is kept entirely on non-lite (data) members. If there
aren't any and the methods here are invoked on a lite member, they will
fail with an NoDataMemberInClusterError.
"""
_EMPTY_ADDRESS_LIST = []
def __init__(self, service_name, name, context):
super(PNCounter, self).__init__(service_name, name, context)
self._observed_clock = VectorClock()
self._max_replica_count = 0
self._current_target_replica_address = None
[docs] def get(self):
"""Returns the current value of the counter.
Returns:
hazelcast.future.Future[int]: The current value of the counter.
Raises:
NoDataMemberInClusterError: if the cluster does not contain any data members.
ConsistencyLostError: if the session guarantees have been lost.
"""
return self._invoke_internal(pn_counter_get_codec)
[docs] def get_and_add(self, delta):
"""Adds the given value to the current value and returns the previous value.
Args:
delta (int): The value to add.
Returns:
hazelcast.future.Future[int]: The previous value.
Raises:
NoDataMemberInClusterError: if the cluster does not contain any data members.
ConsistencyLostError: if the session guarantees have been lost.
"""
return self._invoke_internal(pn_counter_add_codec, delta=delta, get_before_update=True)
[docs] def add_and_get(self, delta):
"""Adds the given value to the current value and returns the updated value.
Args:
delta (int): The value to add.
Returns:
hazelcast.future.Future[int]: The updated value.
Raises:
NoDataMemberInClusterError: if the cluster does not contain any data members.
ConsistencyLostError: if the session guarantees have been lost.
"""
return self._invoke_internal(pn_counter_add_codec, delta=delta, get_before_update=False)
[docs] def get_and_subtract(self, delta):
"""Subtracts the given value from the current value and returns the previous value.
Args:
delta (int): The value to subtract.
Returns:
hazelcast.future.Future[int]: The previous value.
Raises:
NoDataMemberInClusterError: if the cluster does not contain any data members.
ConsistencyLostError: if the session guarantees have been lost.
"""
return self._invoke_internal(pn_counter_add_codec, delta=-1 * delta, get_before_update=True)
[docs] def subtract_and_get(self, delta):
"""Subtracts the given value from the current value and returns the updated value.
Args:
delta (int): The value to subtract.
Returns:
hazelcast.future.Future[int]: The updated value.
Raises:
NoDataMemberInClusterError: if the cluster does not contain any data members.
ConsistencyLostError: if the session guarantees have been lost.
"""
return self._invoke_internal(
pn_counter_add_codec, delta=-1 * delta, get_before_update=False
)
[docs] def get_and_decrement(self):
"""Decrements the counter value by one and returns the previous value.
Returns:
hazelcast.future.Future[int]: The previous value.
Raises:
NoDataMemberInClusterError: if the cluster does not contain any data members.
ConsistencyLostError: if the session guarantees have been lost.
"""
return self._invoke_internal(pn_counter_add_codec, delta=-1, get_before_update=True)
[docs] def decrement_and_get(self):
"""Decrements the counter value by one and returns the updated value.
Returns:
hazelcast.future.Future[int]: The updated value.
Raises:
NoDataMemberInClusterError: if the cluster does not contain any data members.
ConsistencyLostError: if the session guarantees have been lost.
"""
return self._invoke_internal(pn_counter_add_codec, delta=-1, get_before_update=False)
[docs] def get_and_increment(self):
"""Increments the counter value by one and returns the previous value.
Returns:
hazelcast.future.Future[int]: The previous value.
Raises:
NoDataMemberInClusterError: if the cluster does not contain any data members.
ConsistencyLostError: if the session guarantees have been lost.
"""
return self._invoke_internal(pn_counter_add_codec, delta=1, get_before_update=True)
[docs] def increment_and_get(self):
"""Increments the counter value by one and returns the updated value.
Returns:
hazelcast.future.Future[int]: The updated value.
Raises:
NoDataMemberInClusterError: if the cluster does not contain any data members.
UnsupportedOperationError: if the cluster version is less than 3.10.
ConsistencyLostError: if the session guarantees have been lost.
"""
return self._invoke_internal(pn_counter_add_codec, delta=1, get_before_update=False)
[docs] def reset(self):
"""Resets the observed state by this PN counter.
This method may be used after a method invocation has thrown a ``ConsistencyLostError``
to reset the proxy and to be able to start a new session.
"""
self._observed_clock = VectorClock()
def _invoke_internal(self, codec, **kwargs):
delegated_future = Future()
self._set_result_or_error(
delegated_future, PNCounter._EMPTY_ADDRESS_LIST, None, codec, **kwargs
)
return delegated_future
def _set_result_or_error(
self, delegated_future, excluded_addresses, last_error, codec, **kwargs
):
target = self._get_crdt_operation_target(excluded_addresses)
if not target:
if last_error:
delegated_future.set_exception(last_error)
return
delegated_future.set_exception(
NoDataMemberInClusterError(
"Cannot invoke operations on a CRDT because "
"the cluster does not contain any data members"
)
)
return
request = codec.encode_request(
name=self.name,
replica_timestamps=self._observed_clock.entry_set(),
target_replica_uuid=target.uuid,
**kwargs
)
future = self._invoke_on_target(request, target.uuid, codec.decode_response)
checker_func = functools.partial(
self._check_invocation_result,
delegated_future=delegated_future,
excluded_addresses=excluded_addresses,
target=target,
codec=codec,
**kwargs
)
future.add_done_callback(checker_func)
def _check_invocation_result(
self, future, delegated_future, excluded_addresses, target, codec, **kwargs
):
try:
result = future.result()
self._update_observed_replica_timestamp(result["replica_timestamps"])
delegated_future.set_result(result["value"])
except Exception as ex:
_logger.exception(
"Exception occurred while invoking operation on target %s, "
"choosing different target",
target,
)
if excluded_addresses == PNCounter._EMPTY_ADDRESS_LIST:
excluded_addresses = []
excluded_addresses.append(target)
self._set_result_or_error(delegated_future, excluded_addresses, ex, codec, **kwargs)
def _get_crdt_operation_target(self, excluded_addresses):
if (
self._current_target_replica_address
and self._current_target_replica_address not in excluded_addresses
):
return self._current_target_replica_address
self._current_target_replica_address = self._choose_target_replica(excluded_addresses)
return self._current_target_replica_address
def _choose_target_replica(self, excluded_addresses):
replica_addresses = self._get_replica_addresses(excluded_addresses)
if len(replica_addresses) == 0:
return None
random_replica_index = random.randrange(0, len(replica_addresses))
return replica_addresses[random_replica_index]
def _get_replica_addresses(self, excluded_addresses):
data_members = self._context.cluster_service.get_members(
lambda member: not member.lite_member
)
replica_count = self._get_max_configured_replica_count()
current_count = min(replica_count, len(data_members))
replica_addresses = []
for i in range(current_count):
member_address = data_members[i]
if member_address not in excluded_addresses:
replica_addresses.append(member_address)
return replica_addresses
def _get_max_configured_replica_count(self):
if self._max_replica_count > 0:
return self._max_replica_count
request = pn_counter_get_configured_replica_count_codec.encode_request(self.name)
count = self._invoke(
request, pn_counter_get_configured_replica_count_codec.decode_response
).result()
self._max_replica_count = count
return self._max_replica_count
def _update_observed_replica_timestamp(self, observed_timestamps):
observed_clock = self._to_vector_clock(observed_timestamps)
if observed_clock.is_after(self._observed_clock):
self._observed_clock = observed_clock
def _to_vector_clock(self, timestamps):
vector_clock = VectorClock()
for replica_id, timestamp in timestamps:
vector_clock.set_replica_timestamp(replica_id, timestamp)
return vector_clock