Source code for hazelcast.util

import random
import threading
import time
from collections import Sequence, Iterable

from hazelcast import six

DEFAULT_ADDRESS = "127.0.0.1"
DEFAULT_PORT = 5701

MILLISECONDS_IN_SECONDS = 1000
NANOSECONDS_IN_SECONDS = 1e9


def check_not_none(val, message):
    if val is None:
        raise AssertionError(message)


def check_true(val, message):
    if not val:
        raise AssertionError(message)


def check_not_negative(val, message):
    if val < 0:
        raise AssertionError(message)


def check_not_empty(collection, message):
    if not collection:
        raise AssertionError(message)


def check_is_number(val):
    if not isinstance(val, number_types):
        raise AssertionError("Number value expected")


def check_is_int(val):
    if not isinstance(val, six.integer_types):
        raise AssertionError("Int value expected")


def current_time():
    return time.time()


def current_time_in_millis():
    return to_millis(current_time())


def thread_id():
    return threading.currentThread().ident


def to_millis(seconds):
    if seconds is None:
        return -1
    return int(seconds * MILLISECONDS_IN_SECONDS)


def to_nanos(seconds):
    return int(seconds * NANOSECONDS_IN_SECONDS)


def validate_type(_type):
    if not isinstance(_type, type):
        raise ValueError("Serializer should be an instance of %s" % _type.__name__)


def validate_serializer(serializer, _type):
    if not issubclass(serializer, _type):
        raise ValueError("Serializer should be an instance of %s" % _type.__name__)


class AtomicInteger(object):
    """An Integer which can work atomically."""

    def __init__(self, initial=0):
        self._mux = threading.RLock()
        self._counter = initial

    def get_and_increment(self):
        """Returns the current value and increment it.

        Returns:
            int: Current value of AtomicInteger.
        """
        with self._mux:
            res = self._counter
            self._counter += 1
            return res

    def get(self):
        """Returns the current value.

        Returns:
            int: The current value.
        """
        with self._mux:
            return self._counter

    def add(self, count):
        """Adds the given value to the current value.
        Args:
            count (int): The value to add.
        """
        with self._mux:
            self._counter += count


class ImmutableLazyDataList(Sequence):
    def __init__(self, list_data, to_object):
        super(ImmutableLazyDataList, self).__init__()
        self._list_data = list_data
        self._list_obj = [None] * len(self._list_data)
        self.to_object = to_object

    def __contains__(self, value):
        return super(ImmutableLazyDataList, self).__contains__(value)

    def __len__(self):
        return self._list_data.__len__()

    def __getitem__(self, index):
        val = self._list_obj[index]
        if not val:
            data = self._list_data[index]
            if isinstance(data, tuple):
                (key, value) = data
                self._list_obj[index] = (self.to_object(key), self.to_object(value))
            else:
                self._list_obj[index] = self.to_object(data)
        return self._list_obj[index]

    def __eq__(self, other):
        if not isinstance(other, Iterable):
            return False
        self._populate()
        return self._list_obj == other

    def _populate(self):
        for index, data in enumerate(self._list_data):
            if not self._list_obj[index]:
                self.__getitem__(index)

    def __repr__(self):
        self._populate()
        return str(self._list_obj)


# Serialization Utilities


def get_portable_version(portable, default_version):
    try:
        version = portable.get_class_version()
    except AttributeError:
        version = default_version
    return version


# Version utilities
UNKNOWN_VERSION = -1
MAJOR_VERSION_MULTIPLIER = 10000
MINOR_VERSION_MULTIPLIER = 100


def calculate_version(version_str):
    if not version_str:
        return UNKNOWN_VERSION

    main_parts = version_str.split("-")
    tokens = main_parts[0].split(".")

    if len(tokens) < 2:
        return UNKNOWN_VERSION

    try:
        major_coeff = int(tokens[0])
        minor_coeff = int(tokens[1])

        calculated_version = (
            major_coeff * MAJOR_VERSION_MULTIPLIER + minor_coeff * MINOR_VERSION_MULTIPLIER
        )

        if len(tokens) > 2:
            patch_coeff = int(tokens[2])
            calculated_version += patch_coeff

        return calculated_version
    except ValueError:
        return UNKNOWN_VERSION


def to_list(*args, **kwargs):
    return list(*args, **kwargs)


def to_signed(unsigned, bit_len):
    mask = (1 << bit_len) - 1
    if unsigned & (1 << (bit_len - 1)):
        return unsigned | ~mask
    return unsigned & mask


def get_attr_name(cls, value):
    for attr_name, attr_value in six.iteritems(vars(cls)):
        if attr_value == value:
            return attr_name
    return None


def _get_enum_value(cls, key):
    if isinstance(key, six.string_types):
        return getattr(cls, key, None)
    return None


def try_to_get_enum_value(value, enum_class):
    if get_attr_name(enum_class, value):
        # If the value given by the user corresponds
        # to value of the one of the enum members,
        # it is okay to set it directly
        return value
    else:
        # We couldn't find a enum member name for the
        # given value. Try to match the given config
        # option with enum member names and get value
        # associated with it.
        enum_value = _get_enum_value(enum_class, value)
        if enum_value is not None:
            return enum_value
        else:
            raise TypeError(
                "%s must be equal to one of the values or "
                "names of the members of the %s" % (value, enum_class.__name__)
            )


number_types = (six.integer_types, float)
none_type = type(None)


[docs]class LoadBalancer(object): """Load balancer allows you to send operations to one of a number of endpoints (Members). It is up to the implementation to use different load balancing policies. If the client is configured with smart routing, only the operations that are not key based will be routed to the endpoint """
[docs] def init(self, cluster_service): """Initializes the load balancer. Args: cluster_service (hazelcast.cluster.ClusterService): The cluster service to select members from """ raise NotImplementedError("init")
[docs] def next(self): """Returns the next member to route to. Returns: hazelcast.core.MemberInfo: the next member or ``None`` if no member is available. """ raise NotImplementedError("next")
class _AbstractLoadBalancer(LoadBalancer): def __init__(self): self._cluster_service = None self._members = [] def init(self, cluster_service): self._cluster_service = cluster_service cluster_service.add_listener(self._listener, self._listener, True) def _listener(self, _): self._members = self._cluster_service.get_members()
[docs]class RoundRobinLB(_AbstractLoadBalancer): """A load balancer implementation that relies on using round robin to a next member to send a request to. Round robin is done based on best effort basis, the order of members for concurrent calls to the next() is not guaranteed. """ def __init__(self): super(RoundRobinLB, self).__init__() self._idx = 0
[docs] def next(self): members = self._members if not members: return None n = len(members) idx = self._idx % n self._idx += 1 return members[idx]
[docs]class RandomLB(_AbstractLoadBalancer): """A load balancer that selects a random member to route to."""
[docs] def next(self): members = self._members if not members: return None idx = random.randrange(0, len(members)) return members[idx]
class IterationType: """To differentiate users selection on result collection on map-wide operations like ``entry_set``, ``key_set``, ``values`` etc. """ KEY = 0 """Iterate over keys""" VALUE = 1 """Iterate over values""" ENTRY = 2 """Iterate over entries"""