Source code for hazelcast.internal.asyncio_proxy.cp_manager

import asyncio

from hazelcast.cp import (
    _without_default_group_name,
    _get_object_name_for_proxy,
    ATOMIC_LONG_SERVICE,
    ATOMIC_REFERENCE_SERVICE,
    COUNT_DOWN_LATCH_SERVICE,
    SEMAPHORE_SERVICE,
    LOCK_SERVICE,
)
from hazelcast.internal.asyncio_invocation import Invocation
from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong
from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference
from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch
from hazelcast.internal.asyncio_proxy.fenced_lock import FencedLock
from hazelcast.internal.asyncio_proxy.semaphore import (
    Semaphore,
    SessionAwareSemaphore,
    SessionlessSemaphore,
)
from hazelcast.protocol.codec import (
    cp_group_create_cp_group_codec,
    semaphore_get_semaphore_type_codec,
)


[docs] class CPSubsystem: """CP Subsystem is a component of Hazelcast that builds a strongly consistent layer for a set of distributed data structures. Its APIs can be used for implementing distributed coordination use cases, such as leader election, distributed locking, synchronization, and metadata management. Its data structures are CP with respect to the CAP principle, i.e., they always maintain linearizability and prefer consistency to availability during network partitions. Besides network partitions, CP Subsystem withstands server and client failures. Data structures in CP Subsystem run in CP groups. Each CP group elects its own Raft leader and runs the Raft consensus algorithm independently. The CP data structures differ from the other Hazelcast data structures in two aspects. First, an internal commit is performed on the METADATA CP group every time you fetch a proxy from this interface. Hence, callers should cache returned proxy objects. Second, if you call ``destroy()`` on a CP data structure proxy, that data structure is terminated on the underlying CP group and cannot be reinitialized until the CP group is force-destroyed. For this reason, please make sure that you are completely done with a CP data structure before destroying its proxy. """ def __init__(self, context): self._proxy_manager = CPProxyManager(context)
[docs] async def get_atomic_long(self, name: str) -> AtomicLong: """Returns the distributed AtomicLong instance with given name. The instance is created on CP Subsystem. If no group name is given within the ``name`` argument, then the AtomicLong instance will be created on the default CP group. If a group name is given, like ``.get_atomic_long("myLong@group1")``, the given group will be initialized first, if not initialized already, and then the instance will be created on this group. Args: name: Name of the AtomicLong. Returns: The AtomicLong proxy for the given name. """ return await self._proxy_manager.get_or_create(ATOMIC_LONG_SERVICE, name)
[docs] async def get_atomic_reference(self, name: str) -> AtomicReference: """Returns the distributed AtomicReference instance with given name. The instance is created on CP Subsystem. If no group name is given within the ``name`` argument, then the AtomicReference instance will be created on the DEFAULT CP group. If a group name is given, like ``.get_atomic_reference("myRef@group1")``, the given group will be initialized first, if not initialized already, and then the instance will be created on this group. Args: name: Name of the AtomicReference. Returns: The AtomicReference proxy for the given name. """ return await self._proxy_manager.get_or_create(ATOMIC_REFERENCE_SERVICE, name)
[docs] async def get_count_down_latch(self, name: str) -> CountDownLatch: """Returns the distributed CountDownLatch instance with given name. The instance is created on CP Subsystem. If no group name is given within the ``name`` argument, then the CountDownLatch instance will be created on the DEFAULT CP group. If a group name is given, like ``.get_count_down_latch("myLatch@group1")``, the given group will be initialized first, if not initialized already, and then the instance will be created on this group. Args: name: Name of the CountDownLatch. Returns: The CountDownLatch proxy for the given name. """ return await self._proxy_manager.get_or_create(COUNT_DOWN_LATCH_SERVICE, name)
[docs] async def get_lock(self, name: str) -> FencedLock: """Returns the distributed FencedLock instance with given name. The instance is created on CP Subsystem. If no group name is given within the ``name`` argument, then the FencedLock instance will be created on the DEFAULT CP group. If a group name is given, like ``.get_lock("myLock@group1")``, the given group will be initialized first, if not initialized already, and then the instance will be created on this group. Args: name: Name of the FencedLock Returns: The FencedLock proxy for the given name. """ return await self._proxy_manager.get_or_create(LOCK_SERVICE, name)
[docs] async def get_semaphore(self, name: str) -> Semaphore: """Returns the distributed Semaphore instance with given name. The instance is created on CP Subsystem. If no group name is given within the ``name`` argument, then the Semaphore instance will be created on the DEFAULT CP group. If a group name is given, like ``.get_semaphore("mySemaphore@group1")``, the given group will be initialized first, if not initialized already, and then the instance will be created on this group. Args: name: Name of the Semaphore Returns: The Semaphore proxy for the given name. """ return await self._proxy_manager.get_or_create(SEMAPHORE_SERVICE, name)
class CPProxyManager: def __init__(self, context): self._context = context self._lock_proxies = dict() # proxy_name to FencedLock self._mux = asyncio.Lock() # Guards the _lock_proxies async def get_or_create(self, service_name, proxy_name): proxy_name = _without_default_group_name(proxy_name) object_name = _get_object_name_for_proxy(proxy_name) group_id = await self._get_group_id(proxy_name) if service_name == ATOMIC_LONG_SERVICE: return AtomicLong(self._context, group_id, service_name, proxy_name, object_name) elif service_name == ATOMIC_REFERENCE_SERVICE: return AtomicReference(self._context, group_id, service_name, proxy_name, object_name) elif service_name == COUNT_DOWN_LATCH_SERVICE: return CountDownLatch(self._context, group_id, service_name, proxy_name, object_name) elif service_name == LOCK_SERVICE: return await self._create_fenced_lock(group_id, proxy_name, object_name) elif service_name == SEMAPHORE_SERVICE: return await self._create_semaphore(group_id, proxy_name, object_name) raise ValueError("Unknown service name: %s" % service_name) async def _create_fenced_lock(self, group_id, proxy_name, object_name): async with self._mux: proxy = self._lock_proxies.get(proxy_name, None) if proxy: if proxy.get_group_id() != group_id: self._lock_proxies.pop(proxy_name, None) else: return proxy proxy = FencedLock(self._context, group_id, LOCK_SERVICE, proxy_name, object_name) self._lock_proxies[proxy_name] = proxy return proxy async def _create_semaphore(self, group_id, proxy_name, object_name): codec = semaphore_get_semaphore_type_codec request = codec.encode_request(proxy_name) invocation = Invocation(request, response_handler=codec.decode_response) invocation_service = self._context.invocation_service jdk_compatible = await invocation_service.ainvoke(invocation) kls = SessionlessSemaphore if jdk_compatible else SessionAwareSemaphore return kls(self._context, group_id, SEMAPHORE_SERVICE, proxy_name, object_name) async def _get_group_id(self, proxy_name): codec = cp_group_create_cp_group_codec request = codec.encode_request(proxy_name) invocation = Invocation(request, response_handler=codec.decode_response) invocation_service = self._context.invocation_service return await invocation_service.ainvoke(invocation)