Source code for hazelcast.util

import random
import threading
import time
import typing
import uuid

from hazelcast.serialization import UUID_MSB_SHIFT, UUID_LSB_MASK, UUID_MSB_MASK

if typing.TYPE_CHECKING:
    from hazelcast.serialization.data import Data

DEFAULT_ADDRESS = "127.0.0.1"
DEFAULT_PORT = 5701

MILLISECONDS_IN_SECONDS = 1000
NANOSECONDS_IN_SECONDS = 1e9


def check_not_none(val, message):
    if val is None:
        raise AssertionError(message)


def check_true(val, message):
    if not val:
        raise AssertionError(message)


def check_not_negative(val, message):
    if val < 0:
        raise AssertionError(message)


def check_not_empty(collection, message):
    if not collection:
        raise AssertionError(message)


def check_is_number(val, message="Number value expected"):
    if not isinstance(val, number_types):
        raise AssertionError(message)


def check_is_int(val, message="Int value expected"):
    if not isinstance(val, int):
        raise AssertionError(message)


def current_time():
    return time.time()


def current_time_in_millis():
    return to_millis(current_time())


def thread_id():
    return threading.currentThread().ident


def to_millis(seconds):
    if seconds is None:
        return -1
    return int(seconds * MILLISECONDS_IN_SECONDS)


def to_nanos(seconds):
    return int(seconds * NANOSECONDS_IN_SECONDS)


def validate_type(_type):
    if not isinstance(_type, type):
        raise ValueError("Serializer should be an instance of %s" % _type.__name__)


def validate_serializer(serializer, _type):
    if not issubclass(serializer, _type):
        raise ValueError("Serializer should be an instance of %s" % _type.__name__)


class AtomicInteger:
    """An Integer which can work atomically."""

    def __init__(self, initial: int = 0):
        self._mux = threading.RLock()
        self._counter = initial

    def get_and_increment(self) -> int:
        """Returns the current value and increment it.

        Returns:
            Current value of AtomicInteger.
        """
        with self._mux:
            res = self._counter
            self._counter += 1
            return res

    def increment_and_get(self) -> int:
        """Increments the current value and returns it.

        Returns:
            Incremented value of AtomicInteger.
        """
        with self._mux:
            self._counter += 1
            return self._counter

    def get(self) -> int:
        """Returns the current value.

        Returns:
            The current value.
        """
        with self._mux:
            return self._counter

    def add(self, count: int):
        """Adds the given value to the current value.

        Args:
            count: The value to add.
        """
        with self._mux:
            self._counter += count


# Serialization Utilities


def get_portable_version(portable, default_version):
    try:
        version = portable.get_class_version()
    except AttributeError:
        version = default_version
    return version


def deserialize_list_in_place(
    data_list: typing.List["Data"], to_object_fn: typing.Callable[["Data"], typing.Any]
) -> typing.List:
    for i in range(len(data_list)):
        data_list[i] = to_object_fn(data_list[i])

    return data_list


def deserialize_entry_list_in_place(
    entry_data_list: typing.List[typing.Tuple["Data", "Data"]],
    to_object_fn: typing.Callable[["Data"], typing.Any],
) -> typing.List[typing.Tuple[typing.Any, typing.Any]]:
    for i in range(len(entry_data_list)):
        item = entry_data_list[i]
        entry_data_list[i] = (to_object_fn(item[0]), to_object_fn(item[1]))

    return entry_data_list


# Version utilities
UNKNOWN_VERSION = -1
MAJOR_VERSION_MULTIPLIER = 10000
MINOR_VERSION_MULTIPLIER = 100


def calculate_version(version_str):
    if not version_str:
        return UNKNOWN_VERSION

    main_parts = version_str.split("-")
    tokens = main_parts[0].split(".")

    if len(tokens) < 2:
        return UNKNOWN_VERSION

    try:
        major_coeff = int(tokens[0])
        minor_coeff = int(tokens[1])

        calculated_version = (
            major_coeff * MAJOR_VERSION_MULTIPLIER + minor_coeff * MINOR_VERSION_MULTIPLIER
        )

        if len(tokens) > 2:
            patch_coeff = int(tokens[2])
            calculated_version += patch_coeff

        return calculated_version
    except ValueError:
        return UNKNOWN_VERSION


def to_list(*args, **kwargs):
    return list(*args, **kwargs)


def to_signed(unsigned, bit_len):
    mask = (1 << bit_len) - 1
    if unsigned & (1 << (bit_len - 1)):
        return unsigned | ~mask
    return unsigned & mask


def get_attr_name(cls, value):
    for attr_name, attr_value in vars(cls).items():
        if attr_value == value:
            return attr_name
    return None


def _get_enum_value(cls, key):
    if isinstance(key, str):
        return getattr(cls, key, None)
    return None


def try_to_get_enum_value(value, enum_class):
    if get_attr_name(enum_class, value):
        # If the value given by the user corresponds
        # to value of the one of the enum members,
        # it is okay to set it directly
        return value
    else:
        # We couldn't find a enum member name for the
        # given value. Try to match the given config
        # option with enum member names and get value
        # associated with it.
        enum_value = _get_enum_value(enum_class, value)
        if enum_value is not None:
            return enum_value
        else:
            raise TypeError(
                "%s must be equal to one of the values or "
                "names of the members of the %s" % (value, enum_class.__name__)
            )


number_types = (int, float)


[docs]class LoadBalancer: """Load balancer allows you to send operations to one of a number of endpoints (Members). It is up to the implementation to use different load balancing policies. If the client is configured with smart routing, only the operations that are not key based will be routed to the endpoint """
[docs] def init(self, cluster_service): """Initializes the load balancer. Args: cluster_service (hazelcast.cluster.ClusterService): The cluster service to select members from """ raise NotImplementedError("init")
[docs] def next(self): """Returns the next member to route to. Returns: hazelcast.core.MemberInfo: the next member or ``None`` if no member is available. """ raise NotImplementedError("next")
class _AbstractLoadBalancer(LoadBalancer): def __init__(self): self._cluster_service = None self._members = [] def init(self, cluster_service): self._cluster_service = cluster_service cluster_service.add_listener(self._listener, self._listener, True) def _listener(self, _): self._members = self._cluster_service.get_members()
[docs]class RoundRobinLB(_AbstractLoadBalancer): """A load balancer implementation that relies on using round robin to a next member to send a request to. Round robin is done based on best effort basis, the order of members for concurrent calls to the next() is not guaranteed. """ def __init__(self): super(RoundRobinLB, self).__init__() self._idx = 0
[docs] def next(self): members = self._members if not members: return None n = len(members) idx = self._idx % n self._idx += 1 return members[idx]
[docs]class RandomLB(_AbstractLoadBalancer): """A load balancer that selects a random member to route to."""
[docs] def next(self): members = self._members if not members: return None idx = random.randrange(0, len(members)) return members[idx]
class IterationType: """To differentiate users selection on result collection on map-wide operations like ``entry_set``, ``key_set``, ``values`` etc. """ KEY = 0 """Iterate over keys""" VALUE = 1 """Iterate over values""" ENTRY = 2 """Iterate over entries""" class UUIDUtil: @staticmethod def to_bits(value): i = value.int most_significant_bits = to_signed(i >> UUID_MSB_SHIFT, 64) least_significant_bits = to_signed(i & UUID_LSB_MASK, 64) return most_significant_bits, least_significant_bits @staticmethod def from_bits(most_significant_bits, least_significant_bits): return uuid.UUID( int=( ((most_significant_bits << UUID_MSB_SHIFT) & UUID_MSB_MASK) | (least_significant_bits & UUID_LSB_MASK) ) ) def int_from_bytes(buf): return int.from_bytes(buf, "big", signed=True) def int_to_bytes(number): # Number of bytes to represent the number. # For numbers that don't have exactly 8n bit_length, # adding 8 and performing integer division with 8 # let us get the correct length because # (8n + m + 8) // 8 = n + 0 + 1 (assuming m < 8). # For negative numbers, we add 1 to get rid of the # effects of the leading 1 (the sign bit). width = (8 + (number + (number < 0)).bit_length()) // 8 return number.to_bytes(length=width, byteorder="big", signed=True) def try_to_get_error_message(error): # If the error has a message attribute, # return it. If not, almost all of the # built-in errors (and Hazelcast Errors) # set the exception message as the first # parameter of args. If it is not there, # then return None. if hasattr(error, "message"): return error.message elif len(error.args) > 0: return error.args[0] return None def _is_same_version(v1, v2): # Ignores the patch version return v1.major == v2.major and v1.minor == v2.minor def _is_newer_version(v1, v2): # Ignores the patch version return v1.major > v2.major or (v1.major == v2.major and v1.minor > v2.minor) def member_of_larger_same_version_group(members): """Finds a larger same-version group of data members from a collection of members and return a random member from the group. If the same-version groups have the same size, return a member from the newer group. Args: members (list[hazelcast.core.MemberInfo]): List of all members. Returns: hazelcast.core.MemberInfo: The chosen member or ``None``, if no data member is found. """ # The members should have at most 2 different version (ignoring the patch # version). version0 = None version1 = None count0 = 0 count1 = 0 for member in members: if member.lite_member: continue v = member.version if not version0 or _is_same_version(version0, v): version0 = v count0 += 1 elif not version1 or _is_same_version(version1, v): version1 = v count1 += 1 else: raise ValueError( "More than 2 distinct member versions found: %s, %s, %s" % (version0, version1, v) ) # no data members if count0 == 0: return None if count0 > count1 or count0 == count1 and _is_newer_version(version0, version1): count = count0 version = version0 else: count = count0 version = version1 # return a random member from the larger group random_member_idx = random.randrange(0, count) for member in members: if not member.lite_member and _is_same_version(version, member.version): random_member_idx -= 1 if random_member_idx < 0: return member def re_raise(exception, traceback): if exception.__traceback__ is not traceback: raise exception.with_traceback(traceback) raise exception