import itertools
import threading
import collections
from hazelcast.proxy.base import Proxy, MAX_SIZE
from hazelcast.config import FlakeIdGeneratorConfig
from hazelcast.util import current_time
from hazelcast.protocol.codec import flake_id_generator_new_id_batch_codec
from hazelcast.future import ImmediateFuture, Future
[docs]class FlakeIdGenerator(Proxy["BlockingFlakeIdGenerator"]):
"""A cluster-wide unique ID generator. Generated IDs are int values and
are k-ordered (roughly ordered). IDs are in the range from 0 to 2^63 - 1.
The IDs contain a timestamp component and a node ID component, which is
assigned when the member joins the cluster. This allows the IDs to be
ordered and unique without any coordination between members, which makes
the generator safe even in split-brain scenario.
Timestamp component is in milliseconds since 1.1.2018, 0:00 UTC and
has 41 bits. This caps the useful lifespan of the generator to little less
than 70 years (until ~2088). The sequence component is 6 bits. If more
than 64 IDs are requested in single millisecond, IDs will gracefully
overflow to the next millisecond and uniqueness is guaranteed in this case.
The implementation does not allow overflowing by more than 15 seconds, if
IDs are requested at higher rate, the call will block. Note, however, that
clients are able to generate even faster because each call goes to a
different (random) member and the 64 IDs/ms limit is for single member.
Node ID overflow:
It is possible to generate IDs on any member or client as long as there
is at least one member with join version smaller than 2^16 in the
cluster. The remedy is to restart the cluster: nodeId will be assigned
from zero again. Uniqueness after the restart will be preserved thanks
to the timestamp component.
"""
_BITS_NODE_ID = 16
_BITS_SEQUENCE = 6
def __init__(self, service_name, name, context):
super(FlakeIdGenerator, self).__init__(service_name, name, context)
config = context.config.flake_id_generators.get(name, None)
if config is None:
config = FlakeIdGeneratorConfig()
self._auto_batcher = _AutoBatcher(
config.prefetch_count, config.prefetch_validity, self._new_id_batch
)
[docs] def new_id(self) -> Future[int]:
"""Generates and returns a cluster-wide unique ID.
This method goes to a random member and gets a batch of IDs, which will
then be returned locally for a limited time. The pre-fetch size and
the validity time can be configured.
Note:
Values returned from this method may not be strictly ordered.
Returns:
New cluster-wide unique ID.
Raises:
HazelcastError: If node ID for all members in the cluster is out
of valid range. See ``Node ID overflow`` note above.
"""
return self._auto_batcher.new_id()
[docs] def blocking(self) -> "BlockingFlakeIdGenerator":
return BlockingFlakeIdGenerator(self)
def _new_id_batch(self, batch_size):
def handler(message):
response = flake_id_generator_new_id_batch_codec.decode_response(message)
return _IdBatch(response["base"], response["increment"], response["batch_size"])
request = flake_id_generator_new_id_batch_codec.encode_request(self.name, batch_size)
return self._invoke(request, handler)
[docs]class BlockingFlakeIdGenerator(FlakeIdGenerator):
__slots__ = ("_wrapped", "name", "service_name")
def __init__(self, wrapped: FlakeIdGenerator):
self.name = wrapped.name
self.service_name = wrapped.service_name
self._wrapped = wrapped
[docs] def new_id( # type: ignore[override]
self,
) -> int:
return self._wrapped.new_id().result()
[docs] def destroy(self) -> bool:
return self._wrapped.destroy()
[docs] def blocking(self) -> "BlockingFlakeIdGenerator":
return self
def __repr__(self) -> str:
return self._wrapped.__repr__()
class _AutoBatcher:
def __init__(self, batch_size, validity, id_generator):
self._batch_size = batch_size
self._validity = validity
self._batch_id_supplier = id_generator
self._block = _Block(_IdBatch(0, 0, 0), 0)
self._lock = threading.RLock()
self._id_queue = collections.deque()
self._request_in_air = False
def new_id(self):
while True:
block = self._block
next_id = block.next_id()
if next_id is not None:
return ImmediateFuture(next_id)
with self._lock:
# new block was assigned in the meantime
if block is not self._block:
continue
future = Future()
self._id_queue.append(future)
if not self._request_in_air:
self._request_in_air = True
self._request_new_batch()
return future
def _request_new_batch(self):
future = self._batch_id_supplier(self._batch_size)
future.add_done_callback(self._assign_new_block)
def _assign_new_block(self, future):
try:
new_batch_required = False
id_batch = future.result()
block = _Block(id_batch, self._validity)
with self._lock:
while True:
try:
f = self._id_queue.popleft()
next_id = block.next_id()
if next_id is not None:
f.set_result(next_id)
else:
self._id_queue.appendleft(f)
new_batch_required = True
break
except IndexError:
break
if new_batch_required:
self._request_in_air = True
self._request_new_batch()
else:
self._request_in_air = False
self._block = block
except Exception as ex:
with self._lock:
while True:
try:
f = self._id_queue.popleft()
f.set_exception(ex)
except IndexError:
break
self._request_in_air = False
class _IdBatch:
def __init__(self, base, increment, batch_size):
self._base = base
self._increment = increment
self._batch_size = batch_size
def __iter__(self):
self._remaining = itertools.count(self._batch_size, -1)
self._next_id = itertools.count(self._base, self._increment)
return self
def __next__(self):
if next(self._remaining) <= 0:
raise StopIteration
return next(self._next_id)
class _Block:
def __init__(self, id_batch, validity):
self._id_batch = id_batch
self._iterator = iter(self._id_batch)
self._invalid_since = validity + current_time() if validity > 0 else MAX_SIZE
def next_id(self):
if self._invalid_since <= current_time():
return None
return next(self._iterator, None)