Source code for hazelcast.internal.asyncio_client

import asyncio
import logging
import sys
import typing

from hazelcast.internal.asyncio_cluster import ClusterService, _InternalClusterService
from hazelcast.internal.asyncio_compact import CompactSchemaService
from hazelcast.config import Config, IndexConfig
from hazelcast.internal.asyncio_connection import ConnectionManager, DefaultAsyncioAddressProvider
from hazelcast.core import DistributedObjectEvent
from hazelcast.discovery import HazelcastCloudAddressProvider
from hazelcast.errors import IllegalStateError, InvalidConfigurationError
from hazelcast.internal.asyncio_invocation import InvocationService, Invocation
from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem
from hazelcast.internal.asyncio_proxy.pn_counter import PNCounter
from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection
from hazelcast.internal.asyncio_sql import _InternalSqlService, SqlService
from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService
from hazelcast.internal.asyncio_listener import ClusterViewListenerService, ListenerService
from hazelcast.near_cache import NearCacheManager
from hazelcast.internal.asyncio_partition import PartitionService, InternalPartitionService
from hazelcast.protocol.codec import (
    client_add_distributed_object_listener_codec,
    client_remove_distributed_object_listener_codec,
    dynamic_config_add_vector_collection_config_codec,
)
from hazelcast.internal.asyncio_proxy.manager import (
    EXECUTOR_SERVICE,
    LIST_SERVICE,
    MAP_SERVICE,
    MULTI_MAP_SERVICE,
    ProxyManager,
    QUEUE_SERVICE,
    RELIABLE_TOPIC_SERVICE,
    REPLICATED_MAP_SERVICE,
    RINGBUFFER_SERVICE,
    SET_SERVICE,
    TOPIC_SERVICE,
    VECTOR_SERVICE,
    PN_COUNTER_SERVICE,
)
from hazelcast.internal.asyncio_proxy.base import Proxy
from hazelcast.internal.asyncio_proxy.executor import Executor
from hazelcast.internal.asyncio_proxy.list import List
from hazelcast.internal.asyncio_proxy.map import Map
from hazelcast.internal.asyncio_proxy.multi_map import MultiMap
from hazelcast.internal.asyncio_proxy.queue import Queue
from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic
from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap
from hazelcast.internal.asyncio_proxy.ringbuffer import Ringbuffer
from hazelcast.internal.asyncio_proxy.set import Set
from hazelcast.internal.asyncio_proxy.topic import Topic
from hazelcast.internal.asyncio_reactor import AsyncioReactor
from hazelcast.serialization import SerializationServiceV1
from hazelcast.internal.asyncio_statistics import Statistics
from hazelcast.types import KeyType, MessageType, ValueType, ItemType
from hazelcast.util import AtomicInteger, RoundRobinLB

__all__ = ("HazelcastClient",)

_logger = logging.getLogger(__name__)


[docs] class HazelcastClient: """Hazelcast client instance to access and manipulate distributed data structures on the Hazelcast clusters. The client can be configured either by: - providing a configuration object as the first parameter of the constructor .. code:: python from hazelcast.asyncio import HazelcastClient from hazelcast.config import Config config = Config() config.cluster_name = "a-cluster" client = await HazelcastClient.create_and_start(config) - passing configuration options as keyword arguments .. code:: python from hazelcast.asyncio import HazelcastClient client = await HazelcastClient.create_and_start( cluster_name="a-cluster", ) Warning: Asyncio client is not thread-safe, do not access it from other threads. See the :class:`hazelcast.config.Config` documentation for the possible configuration options. """ _CLIENT_ID = AtomicInteger()
[docs] @classmethod async def create_and_start(cls, config: Config | None = None, **kwargs) -> "HazelcastClient": """Creates a HazelcastClient instance, and starts it. .. code:: python from hazelcast.asyncio import HazelcastClient client = await HazelcastClient.create_and_start() See the :class:`hazelcast.config.Config` documentation for the possible configuration options. Args: config: Optional configuration object. **kwargs: Optional keyword arguments of the client configuration. """ client = HazelcastClient(config, **kwargs) await client._start() return client
def __init__(self, config: Config | None = None, **kwargs): """Creates a HazelcastClient instance. This call just creates the instance, without starting it. The preferred way of creating and starting the client instance is using the ``create_and_start`` method: .. code:: python from hazelcast.asyncio import HazelcastClient client = await HazelcastClient.create_and_start() See the :class:`hazelcast.config.Config` documentation for the possible configuration options. Args: config: Optional configuration object. **kwargs: Optional keyword arguments of the client configuration. """ if config: if kwargs: raise InvalidConfigurationError( "Ambiguous client configuration is found. Either provide " "the config object as the only parameter, or do not " "pass it and use keyword arguments to configure the " "client." ) else: config = Config.from_dict(kwargs) self._config = config self._context = _ClientContext() client_id = HazelcastClient._CLIENT_ID.get_and_increment() self._name = self._create_client_name(client_id) self._reactor = AsyncioReactor() self._serialization_service = SerializationServiceV1(config) self._near_cache_manager = NearCacheManager(config, self._serialization_service) self._internal_lifecycle_service = _InternalLifecycleService(config) self._lifecycle_service = LifecycleService(self._internal_lifecycle_service) self._internal_cluster_service = _InternalClusterService(self, config) self._cluster_service = ClusterService(self._internal_cluster_service) self._invocation_service = InvocationService(self, config, self._reactor) self._compact_schema_service = CompactSchemaService( self._serialization_service.compact_stream_serializer, self._invocation_service, self._cluster_service, self._reactor, self._config, ) self._address_provider = self._create_address_provider() self._internal_partition_service = InternalPartitionService(self) self._partition_service = PartitionService( self._internal_partition_service, self._serialization_service, self._compact_schema_service.send_schema_and_retry, ) self._connection_manager = ConnectionManager( self, config, self._reactor, self._address_provider, self._internal_lifecycle_service, self._internal_partition_service, self._internal_cluster_service, self._invocation_service, self._near_cache_manager, self._send_state_to_cluster, ) self._internal_sql_service = _InternalSqlService( self._connection_manager, self._serialization_service, self._invocation_service, self._compact_schema_service.send_schema_and_retry, ) self._sql_service = SqlService(self._internal_sql_service) self._load_balancer = self._init_load_balancer(config) self._listener_service = ListenerService( self, config, self._connection_manager, self._invocation_service, self._compact_schema_service, ) self._proxy_manager = ProxyManager(self._context) self._cp_subsystem = CPSubsystem(self._context) self._lock_reference_id_generator = AtomicInteger(1) self._statistics = Statistics( self, config, self._reactor, self._connection_manager, self._invocation_service, self._near_cache_manager, ) self._cluster_view_listener = ClusterViewListenerService( self, self._connection_manager, self._internal_partition_service, self._internal_cluster_service, self._invocation_service, ) self._shutdown_lock = asyncio.Lock() self._invocation_service.init( self._internal_partition_service, self._connection_manager, self._listener_service, self._compact_schema_service, ) self._init_context() def _init_context(self): self._context.init_context( self, self._config, self._invocation_service, self._internal_partition_service, self._internal_cluster_service, self._connection_manager, self._serialization_service, self._listener_service, self._proxy_manager, self._near_cache_manager, self._lock_reference_id_generator, self._name, self._reactor, self._compact_schema_service, ) async def _start(self): try: self._internal_lifecycle_service.start() await self._invocation_service.start() membership_listeners = self._config.membership_listeners self._internal_cluster_service.start(self._connection_manager, membership_listeners) self._cluster_view_listener.start() await self._connection_manager.start(self._load_balancer) sync_start = not self._config.async_start if sync_start: await self._internal_cluster_service.wait_initial_member_list_fetched() await self._connection_manager.connect_to_all_cluster_members(sync_start) self._listener_service.start() await self._invocation_service.add_backup_listener() self._load_balancer.init(self._cluster_service) await self._statistics.start() except Exception: await self.shutdown() raise _logger.info("Client started")
[docs] async def get_executor(self, name: str) -> Executor: """Returns the executor instance with the specified name. Args: name: Name of the executor. Returns: Executor instance with the specified name. """ return await self._proxy_manager.get_or_create(EXECUTOR_SERVICE, name)
[docs] async def get_list(self, name: str) -> List[KeyType]: """Returns the distributed list instance with the specified name. Args: name: Name of the distributed list. Returns: Distributed list instance with the specified name. """ return await self._proxy_manager.get_or_create(LIST_SERVICE, name)
[docs] async def get_map(self, name: str) -> Map[KeyType, ValueType]: """Returns the distributed map instance with the specified name. Args: name: Name of the distributed map. Returns: Distributed map instance with the specified name. """ return await self._proxy_manager.get_or_create(MAP_SERVICE, name)
[docs] async def get_multi_map(self, name: str) -> MultiMap[KeyType, ValueType]: """Returns the distributed MultiMap instance with the specified name. Args: name: Name of the distributed MultiMap. Returns: Distributed MultiMap instance with the specified name. """ return await self._proxy_manager.get_or_create(MULTI_MAP_SERVICE, name)
[docs] async def get_queue(self, name: str) -> Queue[KeyType]: """Returns the distributed queue instance with the specified name. Args: name: Name of the distributed queue. Returns: Distributed queue instance with the specified name. """ return await self._proxy_manager.get_or_create(QUEUE_SERVICE, name)
[docs] async def get_set(self, name: str) -> Set[KeyType]: """Returns the distributed set instance with the specified name. Args: name: Name of the distributed set. Returns: Distributed set instance with the specified name. """ return await self._proxy_manager.get_or_create(SET_SERVICE, name)
[docs] async def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueType]: """Returns the distributed ReplicatedMap instance with the specified name. Args: name: Name of the distributed replicated map. Returns: Distributed ReplicatedMap instance with the specified name. """ return await self._proxy_manager.get_or_create(REPLICATED_MAP_SERVICE, name)
[docs] async def get_reliable_topic(self, name: str) -> ReliableTopic: """Returns the ReliableTopic instance with the specified name. Args: name: Name of the ReliableTopic. Returns: Distributed ReliableTopic instance with the specified name. """ return await self._proxy_manager.get_or_create(RELIABLE_TOPIC_SERVICE, name)
[docs] async def get_ringbuffer(self, name: str) -> Ringbuffer: """Returns the distributed Ringbuffer instance with the specified name. Args: name: Name of the distributed ringbuffer. Returns: Distributed Ringbuffer instance with the specified name. """ return await self._proxy_manager.get_or_create(RINGBUFFER_SERVICE, name)
[docs] async def get_pn_counter(self, name: str) -> PNCounter: """Returns the PN Counter instance with the specified name. Args: name: Name of the PN Counter. Returns: Distributed PN Counter instance with the specified name. """ return await self._proxy_manager.get_or_create(PN_COUNTER_SERVICE, name)
[docs] async def get_topic(self, name: str) -> Topic[MessageType]: """Returns the distributed topic instance with the specified name. Args: name: Name of the distributed topic. Returns: Distributed topic instance with the specified name. """ return await self._proxy_manager.get_or_create(TOPIC_SERVICE, name)
[docs] async def create_vector_collection_config( self, name: str, indexes: typing.List[IndexConfig], backup_count: int = 1, async_backup_count: int = 0, split_brain_protection_name: typing.Optional[str] = None, merge_policy: str = "PutIfAbsentMergePolicy", merge_batch_size: int = 100, ) -> None: """Creates a vector collection with the given configuration. Args: name: Name of the distributed map. indexes: One or more index configurations. The index names must be unique. backup_count: Number of backups to keep for the vector collection. split_brain_protection_name: Name of the split brain protection configuration. See https://docs.hazelcast.com/hazelcast/5.6/data-structures/vector-collections#split-brain-protection merge_policy: The merge policy to use while recovering in a split brain situation. See https://docs.hazelcast.com/hazelcast/5.6/data-structures/vector-collections#merge-policy """ # check that indexes have different names if indexes: index_names = set(index.name for index in indexes) if len(index_names) != len(indexes): raise AssertionError("index names must be unique") request = dynamic_config_add_vector_collection_config_codec.encode_request( name, indexes, backup_count, async_backup_count, split_brain_protection_name, merge_policy, merge_batch_size, ) invocation = Invocation(request, response_handler=lambda m: m) await self._invocation_service.ainvoke(invocation)
[docs] async def get_vector_collection(self, name: str) -> VectorCollection: """Returns the vector collection instance with the specified name. Args: name: Name of the vector collection. Returns: Vector collection instance with the specified name. """ return await self._proxy_manager.get_or_create(VECTOR_SERVICE, name)
[docs] async def add_distributed_object_listener( self, listener_func: typing.Callable[[DistributedObjectEvent], None] ) -> str: """Adds a listener which will be notified when a new distributed object is created or destroyed. Args: listener_func: Function to be called when a distributed object is created or destroyed. Returns: A registration id which is used as a key to remove the listener. """ is_smart = self._config.smart_routing codec = client_add_distributed_object_listener_codec request = codec.encode_request(is_smart) def handle_distributed_object_event(name, service_name, event_type, source): event = DistributedObjectEvent(name, service_name, event_type, source) listener_func(event) def event_handler(client_message): return codec.handle(client_message, handle_distributed_object_event) return await self._listener_service.register_listener( request, codec.decode_response, client_remove_distributed_object_listener_codec.encode_request, event_handler, )
[docs] async def remove_distributed_object_listener(self, registration_id: str) -> bool: """Removes the specified distributed object listener. Returns silently if there is no such listener added before. Args: registration_id: The id of the registered listener. Returns: ``True`` if registration is removed, ``False`` otherwise. """ return await self._listener_service.deregister_listener(registration_id)
[docs] async def shutdown(self) -> None: """Shuts down this HazelcastClient.""" async with self._shutdown_lock: if self._internal_lifecycle_service.running: self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTTING_DOWN) self._internal_lifecycle_service.shutdown() self._near_cache_manager.destroy_near_caches() await self._connection_manager.shutdown() self._invocation_service.shutdown() self._statistics.shutdown() self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTDOWN)
@property def name(self) -> str: """Name of the client.""" return self._name @property def lifecycle_service(self) -> LifecycleService: """Lifecycle service allows you to check if the client is running and add and remove lifecycle listeners. """ return self._lifecycle_service @property def partition_service(self) -> PartitionService: """Partition service allows you to get partition count, introspect the partition owners, and partition ids of keys. """ return self._partition_service @property def cluster_service(self) -> ClusterService: """ClusterService: Cluster service allows you to get the list of the cluster members and add and remove membership listeners. """ return self._cluster_service @property def sql(self) -> SqlService: """Returns a service to execute distributed SQL queries.""" return self._sql_service @property def cp_subsystem(self) -> CPSubsystem: """CP Subsystem offers set of in-memory linearizable data structures.""" return self._cp_subsystem def _create_address_provider(self): config = self._config cluster_members = config.cluster_members address_list_provided = len(cluster_members) > 0 cloud_discovery_token = config.cloud_discovery_token cloud_enabled = cloud_discovery_token is not None if address_list_provided and cloud_enabled: raise IllegalStateError( "Only one discovery method can be enabled at a time. " "Cluster members given explicitly: %s, Hazelcast Cloud enabled: %s" % (address_list_provided, cloud_enabled) ) if cloud_enabled: connection_timeout = self._get_connection_timeout(config) return HazelcastCloudAddressProvider(cloud_discovery_token, connection_timeout) return DefaultAsyncioAddressProvider(cluster_members) def _create_client_name(self, client_id): client_name = self._config.client_name if client_name: return client_name return "hz.client_%s" % client_id async def _send_state_to_cluster(self): return await self._compact_schema_service.send_all_schemas() @staticmethod def _get_connection_timeout(config): timeout = config.connection_timeout return sys.maxsize if timeout == 0 else timeout @staticmethod def _init_load_balancer(config): load_balancer = config.load_balancer if not load_balancer: load_balancer = RoundRobinLB() return load_balancer
class _ClientContext: def __init__(self): self.client = None self.config = None self.invocation_service = None self.partition_service = None self.cluster_service = None self.connection_manager = None self.serialization_service = None self.listener_service = None self.proxy_manager = None self.near_cache_manager = None self.lock_reference_id_generator = None self.name = None self.reactor = None self.compact_schema_service = None def init_context( self, client, config, invocation_service, partition_service, cluster_service, connection_manager, serialization_service, listener_service, proxy_manager, near_cache_manager, lock_reference_id_generator, name, reactor, compact_schema_service, ): self.client = client self.config = config self.invocation_service = invocation_service self.partition_service = partition_service self.cluster_service = cluster_service self.connection_manager = connection_manager self.serialization_service = serialization_service self.listener_service = listener_service self.proxy_manager = proxy_manager self.near_cache_manager = near_cache_manager self.lock_reference_id_generator = lock_reference_id_generator self.name = name self.reactor = reactor self.compact_schema_service = compact_schema_service