Source code for hazelcast.internal.asyncio_proxy.flake_id_generator

import asyncio
import collections
import itertools

from hazelcast.config import FlakeIdGeneratorConfig
from hazelcast.internal.asyncio_proxy.base import MAX_SIZE, Proxy
from hazelcast.protocol.codec import flake_id_generator_new_id_batch_codec
from hazelcast.util import current_time


[docs] class FlakeIdGenerator(Proxy): """A cluster-wide unique ID generator. Generated IDs are int values and are k-ordered (roughly ordered). IDs are in the range from 0 to 2^63 - 1. The IDs contain a timestamp component and a node ID component, which is assigned when the member joins the cluster. This allows the IDs to be ordered and unique without any coordination between members, which makes the generator safe even in split-brain scenario. Timestamp component is in milliseconds since 1.1.2018, 0:00 UTC and has 41 bits. This caps the useful lifespan of the generator to little less than 70 years (until ~2088). The sequence component is 6 bits. If more than 64 IDs are requested in single millisecond, IDs will gracefully overflow to the next millisecond and uniqueness is guaranteed in this case. The implementation does not allow overflowing by more than 15 seconds, if IDs are requested at higher rate, the call will block. Note, however, that clients are able to generate even faster because each call goes to a different (random) member and the 64 IDs/ms limit is for single member. Node ID overflow: It is possible to generate IDs on any member or client as long as there is at least one member with join version smaller than 2^16 in the cluster. The remedy is to restart the cluster: nodeId will be assigned from zero again. Uniqueness after the restart will be preserved thanks to the timestamp component. Warning: Asyncio client FlakeIdGenerator proxy is not thread-safe, do not access it from other threads. """ _BITS_NODE_ID = 16 _BITS_SEQUENCE = 6 def __init__(self, service_name, name, context): super().__init__(service_name, name, context) config = context.config.flake_id_generators.get(name, None) if config is None: config = FlakeIdGeneratorConfig() self._auto_batcher = _AutoBatcher( config.prefetch_count, config.prefetch_validity, self._new_id_batch )
[docs] async def new_id(self) -> int: """Generates and returns a cluster-wide unique ID. This method goes to a random member and gets a batch of IDs, which will then be returned locally for a limited time. The pre-fetch size and the validity time can be configured. Note: Values returned from this method may not be strictly ordered. Returns: New cluster-wide unique ID. Raises: HazelcastError: If node ID for all members in the cluster is out of valid range. See ``Node ID overflow`` note above. """ return await self._auto_batcher.new_id()
def _new_id_batch(self, batch_size) -> asyncio.Future: def handler(message): response = flake_id_generator_new_id_batch_codec.decode_response(message) return _IdBatch(response["base"], response["increment"], response["batch_size"]) request = flake_id_generator_new_id_batch_codec.encode_request(self.name, batch_size) return self._invoke(request, handler)
class _AutoBatcher: def __init__(self, batch_size, validity, id_generator): self._batch_size = batch_size self._validity = validity self._batch_id_supplier = id_generator self._block = _Block(_IdBatch(0, 0, 0), 0) self._id_queue = collections.deque() self._request_in_air = False async def new_id(self) -> int: loop = asyncio.get_running_loop() while True: block = self._block next_id = block.next_id() if next_id is not None: return next_id # In asyncio there is no preemption between non-await statements, # so block cannot change here. The check mirrors the threading # version for clarity. if block is not self._block: continue future = loop.create_future() self._id_queue.append(future) if not self._request_in_air: self._request_in_air = True self._request_new_batch() return await future def _request_new_batch(self): future = self._batch_id_supplier(self._batch_size) future.add_done_callback(self._assign_new_block) def _assign_new_block(self, future): try: new_batch_required = False id_batch = future.result() block = _Block(id_batch, self._validity) while True: try: f = self._id_queue.popleft() next_id = block.next_id() if next_id is not None: f.set_result(next_id) else: self._id_queue.appendleft(f) new_batch_required = True break except IndexError: break if new_batch_required: self._request_in_air = True self._request_new_batch() else: self._request_in_air = False self._block = block except Exception as ex: while True: try: f = self._id_queue.popleft() f.set_exception(ex) except IndexError: break self._request_in_air = False class _IdBatch: def __init__(self, base, increment, batch_size): self._base = base self._increment = increment self._batch_size = batch_size def __iter__(self): self._remaining = itertools.count(self._batch_size, -1) self._next_id = itertools.count(self._base, self._increment) return self def __next__(self): if next(self._remaining) <= 0: raise StopIteration return next(self._next_id) class _Block: def __init__(self, id_batch, validity): self._id_batch = id_batch self._iterator = iter(self._id_batch) self._invalid_since = validity + current_time() if validity > 0 else MAX_SIZE def next_id(self): if self._invalid_since <= current_time(): return None return next(self._iterator, None) async def create_flake_id_generator_proxy(service_name, name, context): return FlakeIdGenerator(service_name, name, context)