Source code for hazelcast.client

import logging
import sys
import threading
import typing

from hazelcast.cluster import ClusterService, _InternalClusterService
from hazelcast.compact import CompactSchemaService
from hazelcast.config import Config
from hazelcast.connection import ConnectionManager, DefaultAddressProvider
from hazelcast.core import DistributedObjectEvent, DistributedObjectInfo
from hazelcast.cp import CPSubsystem, ProxySessionManager
from hazelcast.discovery import HazelcastCloudAddressProvider
from hazelcast.errors import IllegalStateError, InvalidConfigurationError
from hazelcast.future import Future
from hazelcast.invocation import InvocationService, Invocation
from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService
from hazelcast.listener import ClusterViewListenerService, ListenerService
from hazelcast.near_cache import NearCacheManager
from hazelcast.partition import PartitionService, _InternalPartitionService
from hazelcast.protocol.codec import (
    client_add_distributed_object_listener_codec,
    client_get_distributed_objects_codec,
    client_remove_distributed_object_listener_codec,
)
from hazelcast.proxy import (
    EXECUTOR_SERVICE,
    FLAKE_ID_GENERATOR_SERVICE,
    LIST_SERVICE,
    MAP_SERVICE,
    MULTI_MAP_SERVICE,
    PN_COUNTER_SERVICE,
    QUEUE_SERVICE,
    RELIABLE_TOPIC_SERVICE,
    REPLICATED_MAP_SERVICE,
    RINGBUFFER_SERVICE,
    SET_SERVICE,
    TOPIC_SERVICE,
    Executor,
    FlakeIdGenerator,
    List,
    MultiMap,
    PNCounter,
    ProxyManager,
    Queue,
    ReliableTopic,
    ReplicatedMap,
    Ringbuffer,
    Set,
    Topic,
)
from hazelcast.proxy.base import Proxy
from hazelcast.proxy.map import Map
from hazelcast.reactor import AsyncoreReactor
from hazelcast.serialization import SerializationServiceV1
from hazelcast.sql import SqlService, _InternalSqlService
from hazelcast.statistics import Statistics
from hazelcast.transaction import TWO_PHASE, Transaction, TransactionManager
from hazelcast.types import KeyType, ValueType, ItemType, MessageType
from hazelcast.util import AtomicInteger, RoundRobinLB

__all__ = ("HazelcastClient",)

_logger = logging.getLogger(__name__)


[docs]class HazelcastClient: """Hazelcast client instance to access and manipulate distributed data structures on the Hazelcast clusters. """ _CLIENT_ID = AtomicInteger() def __init__(self, config: Config = None, **kwargs): """The client can be configured either by: - providing a configuration object as the first parameter of the constructor .. code:: python from hazelcast import HazelcastClient from hazelcast.config import Config config = Config() config.cluster_name = "a-cluster" client = HazelcastClient(config) - passing configuration options as keyword arguments .. code:: python from hazelcast import HazelcastClient client = HazelcastClient( cluster_name="a-cluster", ) See the :class:`hazelcast.config.Config` documentation for the possible configuration options. Args: config: Optional configuration object. **kwargs: Optional keyword arguments of the client configuration. """ if config: if kwargs: raise InvalidConfigurationError( "Ambiguous client configuration is found. Either provide " "the config object as the only parameter, or do not " "pass it and use keyword arguments to configure the " "client." ) else: config = Config.from_dict(kwargs) self._config = config self._context = _ClientContext() client_id = HazelcastClient._CLIENT_ID.get_and_increment() self._name = self._create_client_name(client_id) self._reactor = AsyncoreReactor() self._serialization_service = SerializationServiceV1(config) self._near_cache_manager = NearCacheManager(config, self._serialization_service) self._internal_lifecycle_service = _InternalLifecycleService(config) self._lifecycle_service = LifecycleService(self._internal_lifecycle_service) self._internal_cluster_service = _InternalClusterService(self, config) self._cluster_service = ClusterService(self._internal_cluster_service) self._invocation_service = InvocationService(self, config, self._reactor) self._compact_schema_service = CompactSchemaService( self._serialization_service.compact_stream_serializer, self._invocation_service, self._cluster_service, self._reactor, self._config, ) self._address_provider = self._create_address_provider() self._internal_partition_service = _InternalPartitionService(self) self._partition_service = PartitionService( self._internal_partition_service, self._serialization_service, self._compact_schema_service.send_schema_and_retry, ) self._connection_manager = ConnectionManager( self, config, self._reactor, self._address_provider, self._internal_lifecycle_service, self._internal_partition_service, self._internal_cluster_service, self._invocation_service, self._near_cache_manager, self._send_state_to_cluster, ) self._load_balancer = self._init_load_balancer(config) self._listener_service = ListenerService( self, config, self._connection_manager, self._invocation_service, self._compact_schema_service, ) self._proxy_manager = ProxyManager(self._context) self._cp_subsystem = CPSubsystem(self._context) self._proxy_session_manager = ProxySessionManager(self._context) self._transaction_manager = TransactionManager(self._context) self._lock_reference_id_generator = AtomicInteger(1) self._statistics = Statistics( self, config, self._reactor, self._connection_manager, self._invocation_service, self._near_cache_manager, ) self._cluster_view_listener = ClusterViewListenerService( self, self._connection_manager, self._internal_partition_service, self._internal_cluster_service, self._invocation_service, ) self._shutdown_lock = threading.RLock() self._invocation_service.init( self._internal_partition_service, self._connection_manager, self._listener_service, self._compact_schema_service, ) self._internal_sql_service = _InternalSqlService( self._connection_manager, self._serialization_service, self._invocation_service, self._compact_schema_service.send_schema_and_retry, ) self._sql_service = SqlService(self._internal_sql_service) self._init_context() self._start() def _init_context(self): self._context.init_context( self, self._config, self._invocation_service, self._internal_partition_service, self._internal_cluster_service, self._connection_manager, self._serialization_service, self._listener_service, self._proxy_manager, self._near_cache_manager, self._lock_reference_id_generator, self._name, self._proxy_session_manager, self._reactor, self._compact_schema_service, ) def _start(self): self._reactor.start() try: self._internal_lifecycle_service.start() self._invocation_service.start() membership_listeners = self._config.membership_listeners self._internal_cluster_service.start(self._connection_manager, membership_listeners) self._cluster_view_listener.start() self._connection_manager.start(self._load_balancer) sync_start = not self._config.async_start if sync_start: self._internal_cluster_service.wait_initial_member_list_fetched() self._connection_manager.connect_to_all_cluster_members(sync_start) self._listener_service.start() self._invocation_service.add_backup_listener() self._load_balancer.init(self._cluster_service) self._statistics.start() except: self.shutdown() raise _logger.info("Client started")
[docs] def get_executor(self, name: str) -> Executor: """Creates cluster-wide ExecutorService. Args: name: Name of the Executor proxy. Returns: Executor proxy for the given name. """ return self._proxy_manager.get_or_create(EXECUTOR_SERVICE, name)
[docs] def get_flake_id_generator(self, name: str) -> FlakeIdGenerator: """Creates or returns a cluster-wide FlakeIdGenerator. Args: name: Name of the FlakeIdGenerator proxy. Returns: FlakeIdGenerator proxy for the given name. """ return self._proxy_manager.get_or_create(FLAKE_ID_GENERATOR_SERVICE, name)
[docs] def get_queue(self, name: str) -> Queue[ItemType]: """Returns the distributed queue instance with the specified name. Args: name: Name of the distributed queue. Returns: Distributed queue instance with the specified name. """ return self._proxy_manager.get_or_create(QUEUE_SERVICE, name)
[docs] def get_list(self, name: str) -> List[ItemType]: """Returns the distributed list instance with the specified name. Args: name: Name of the distributed list. Returns: Distributed list instance with the specified name. """ return self._proxy_manager.get_or_create(LIST_SERVICE, name)
[docs] def get_map(self, name: str) -> Map[KeyType, ValueType]: """Returns the distributed map instance with the specified name. Args: name: Name of the distributed map. Returns: Distributed map instance with the specified name. """ return self._proxy_manager.get_or_create(MAP_SERVICE, name)
[docs] def get_multi_map(self, name: str) -> MultiMap[KeyType, ValueType]: """Returns the distributed MultiMap instance with the specified name. Args: name: Name of the distributed MultiMap. Returns: Distributed MultiMap instance with the specified name. """ return self._proxy_manager.get_or_create(MULTI_MAP_SERVICE, name)
[docs] def get_pn_counter(self, name: str) -> PNCounter: """Returns the PN Counter instance with the specified name. Args: name: Name of the PN Counter. Returns: Distributed PN Counter instance with the specified name. """ return self._proxy_manager.get_or_create(PN_COUNTER_SERVICE, name)
[docs] def get_reliable_topic(self, name: str) -> ReliableTopic[MessageType]: """Returns the ReliableTopic instance with the specified name. Args: name: Name of the ReliableTopic. Returns: Distributed ReliableTopic instance with the specified name. """ return self._proxy_manager.get_or_create(RELIABLE_TOPIC_SERVICE, name)
[docs] def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueType]: """Returns the distributed ReplicatedMap instance with the specified name. Args: name: Name of the distributed ReplicatedMap. Returns: Distributed ReplicatedMap instance with the specified name. """ return self._proxy_manager.get_or_create(REPLICATED_MAP_SERVICE, name)
[docs] def get_ringbuffer(self, name: str) -> Ringbuffer[ItemType]: """Returns the distributed Ringbuffer instance with the specified name. Args: name: Name of the distributed Ringbuffer. Returns: Distributed RingBuffer instance with the specified name. """ return self._proxy_manager.get_or_create(RINGBUFFER_SERVICE, name)
[docs] def get_set(self, name: str) -> Set[ItemType]: """Returns the distributed Set instance with the specified name. Args: name: Name of the distributed Set. Returns: Distributed Set instance with the specified name. """ return self._proxy_manager.get_or_create(SET_SERVICE, name)
[docs] def get_topic(self, name: str) -> Topic[MessageType]: """Returns the Topic instance with the specified name. Args: name: Name of the Topic. Returns: The Topic. """ return self._proxy_manager.get_or_create(TOPIC_SERVICE, name)
[docs] def new_transaction( self, timeout: float = 120, durability: int = 1, type: int = TWO_PHASE ) -> Transaction: """Creates a new Transaction associated with the current thread using default or given options. Args: timeout: The timeout in seconds determines the maximum lifespan of a transaction. So if a transaction is configured with a timeout of 2 minutes, then it will automatically rollback if it hasn't committed yet. durability: The durability is the number of machines that can take over if a member fails during a transaction commit or rollback. type: The transaction type which can be ``TWO_PHASE`` or ``ONE_PHASE``. Returns: New Transaction associated with the current thread. """ return self._transaction_manager.new_transaction(timeout, durability, type)
[docs] def add_distributed_object_listener( self, listener_func: typing.Callable[[DistributedObjectEvent], None] ) -> Future[str]: """Adds a listener which will be notified when a new distributed object is created or destroyed. Args: listener_func: Function to be called when a distributed object is created or destroyed. Returns: A registration id which is used as a key to remove the listener. """ is_smart = self._config.smart_routing codec = client_add_distributed_object_listener_codec request = codec.encode_request(is_smart) def handle_distributed_object_event(name, service_name, event_type, source): event = DistributedObjectEvent(name, service_name, event_type, source) listener_func(event) def event_handler(client_message): return codec.handle(client_message, handle_distributed_object_event) return self._listener_service.register_listener( request, codec.decode_response, client_remove_distributed_object_listener_codec.encode_request, event_handler, )
[docs] def remove_distributed_object_listener(self, registration_id: str) -> Future[bool]: """Removes the specified distributed object listener. Returns silently if there is no such listener added before. Args: registration_id: The id of registered listener. Returns: ``True`` if registration is removed, ``False`` otherwise. """ return self._listener_service.deregister_listener(registration_id)
[docs] def get_distributed_objects(self) -> Future[typing.List[Proxy]]: """Returns all distributed objects such as; queue, map, set, list, topic, lock, multimap. Also, as a side effect, it clears the local instances of the destroyed proxies. Returns: List of instances created by Hazelcast. """ request = client_get_distributed_objects_codec.encode_request() invocation = Invocation(request, response_handler=lambda m: m) self._invocation_service.invoke(invocation) local_distributed_object_infos = { DistributedObjectInfo(dist_obj.service_name, dist_obj.name) for dist_obj in self._proxy_manager.get_distributed_objects() } response = client_get_distributed_objects_codec.decode_response(invocation.future.result()) for dist_obj_info in response: local_distributed_object_infos.discard(dist_obj_info) self._proxy_manager.get_or_create( dist_obj_info.service_name, dist_obj_info.name, create_on_remote=False ) for dist_obj_info in local_distributed_object_infos: self._proxy_manager.destroy_proxy( dist_obj_info.service_name, dist_obj_info.name, destroy_on_remote=False ) return self._proxy_manager.get_distributed_objects()
[docs] def shutdown(self) -> None: """Shuts down this HazelcastClient.""" with self._shutdown_lock: if self._internal_lifecycle_service.running: self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTTING_DOWN) self._internal_lifecycle_service.shutdown() self._proxy_session_manager.shutdown().result() self._near_cache_manager.destroy_near_caches() self._connection_manager.shutdown() self._invocation_service.shutdown() self._statistics.shutdown() self._reactor.shutdown() self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTDOWN)
@property def name(self) -> str: """Name of the client.""" return self._name @property def lifecycle_service(self) -> LifecycleService: """Lifecycle service allows you to check if the client is running and add and remove lifecycle listeners. """ return self._lifecycle_service @property def partition_service(self) -> PartitionService: """Partition service allows you to get partition count, introspect the partition owners, and partition ids of keys. """ return self._partition_service @property def cluster_service(self) -> ClusterService: """ClusterService: Cluster service allows you to get the list of the cluster members and add and remove membership listeners. """ return self._cluster_service @property def cp_subsystem(self) -> CPSubsystem: """CP Subsystem offers set of in-memory linearizable data structures.""" return self._cp_subsystem @property def sql(self) -> SqlService: """Returns a service to execute distributed SQL queries.""" return self._sql_service def _create_address_provider(self): config = self._config cluster_members = config.cluster_members address_list_provided = len(cluster_members) > 0 cloud_discovery_token = config.cloud_discovery_token cloud_enabled = cloud_discovery_token is not None if address_list_provided and cloud_enabled: raise IllegalStateError( "Only one discovery method can be enabled at a time. " "Cluster members given explicitly: %s, Hazelcast Viridian enabled: %s" % (address_list_provided, cloud_enabled) ) if cloud_enabled: connection_timeout = self._get_connection_timeout(config) return HazelcastCloudAddressProvider(cloud_discovery_token, connection_timeout) return DefaultAddressProvider(cluster_members) def _create_client_name(self, client_id): client_name = self._config.client_name if client_name: return client_name return "hz.client_%s" % client_id def _send_state_to_cluster(self) -> Future: return self._compact_schema_service.send_all_schemas() @staticmethod def _get_connection_timeout(config): timeout = config.connection_timeout return sys.maxsize if timeout == 0 else timeout @staticmethod def _init_load_balancer(config): load_balancer = config.load_balancer if not load_balancer: load_balancer = RoundRobinLB() return load_balancer
class _ClientContext: """ Context holding all the required services, managers and the configuration for a Hazelcast client. """ def __init__(self): self.client = None self.config = None self.invocation_service = None self.partition_service = None self.cluster_service = None self.connection_manager = None self.serialization_service = None self.listener_service = None self.proxy_manager = None self.near_cache_manager = None self.lock_reference_id_generator = None self.name = None self.proxy_session_manager = None self.reactor = None self.compact_schema_service = None def init_context( self, client, config, invocation_service, partition_service, cluster_service, connection_manager, serialization_service, listener_service, proxy_manager, near_cache_manager, lock_reference_id_generator, name, proxy_session_manager, reactor, compact_schema_service, ): self.client = client self.config = config self.invocation_service = invocation_service self.partition_service = partition_service self.cluster_service = cluster_service self.connection_manager = connection_manager self.serialization_service = serialization_service self.listener_service = listener_service self.proxy_manager = proxy_manager self.near_cache_manager = near_cache_manager self.lock_reference_id_generator = lock_reference_id_generator self.name = name self.proxy_session_manager = proxy_session_manager self.reactor = reactor self.compact_schema_service = compact_schema_service