Source code for hazelcast.proxy.executor

import typing
from uuid import uuid4
from hazelcast import future
from hazelcast.core import MemberInfo
from hazelcast.future import Future
from hazelcast.protocol.codec import (
    executor_service_shutdown_codec,
    executor_service_is_shutdown_codec,
    executor_service_submit_to_partition_codec,
    executor_service_submit_to_member_codec,
)
from hazelcast.proxy.base import Proxy
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_not_none


[docs]class Executor(Proxy["BlockingExecutor"]): """An object that executes submitted executable tasks."""
[docs] def execute_on_key_owner(self, key: typing.Any, task: typing.Any) -> Future[typing.Any]: """Executes a task on the owner of the specified key. Args: key: The specified key. task: A task executed on the owner of the specified key. Returns: The result of the task. """ check_not_none(key, "key can't be None") check_not_none(task, "task can't be None") try: key_data = self._to_data(key) task_data = self._to_data(task) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.execute_on_key_owner, key, task) partition_id = self._context.partition_service.get_partition_id(key_data) uuid = uuid4() def handler(message): return self._to_object( executor_service_submit_to_partition_codec.decode_response(message) ) request = executor_service_submit_to_partition_codec.encode_request( self.name, uuid, task_data ) return self._invoke_on_partition(request, partition_id, handler)
[docs] def execute_on_member(self, member: MemberInfo, task: typing.Any) -> Future[typing.Any]: """Executes a task on the specified member. Args: member: The specified member. task: The task executed on the specified member. Returns: The result of the task. """ check_not_none(task, "task can't be None") try: task_data = self._to_data(task) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.execute_on_member, member, task) uuid = uuid4() return self._execute_on_member(uuid, task_data, member.uuid)
[docs] def execute_on_members( self, members: typing.Sequence[MemberInfo], task: typing.Any ) -> Future[typing.List[typing.Any]]: """Executes a task on each of the specified members. Args: members: The specified members. task: The task executed on the specified members. Returns: The list of results of the tasks on each member. """ try: task_data = self._to_data(task) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.execute_on_members, members, task) futures = [] uuid = uuid4() for member in members: f = self._execute_on_member(uuid, task_data, member.uuid) futures.append(f) return future.combine_futures(futures)
[docs] def execute_on_all_members(self, task: typing.Any) -> Future[typing.List[typing.Any]]: """Executes a task on all the known cluster members. Args: task: The task executed on the all the members. Returns: The list of results of the tasks on each member. """ return self.execute_on_members(self._context.cluster_service.get_members(), task)
[docs] def is_shutdown(self) -> Future[bool]: """Determines whether this executor has been shutdown or not. Returns: ``True`` if the executor has been shutdown, ``False`` otherwise. """ request = executor_service_is_shutdown_codec.encode_request(self.name) return self._invoke(request, executor_service_is_shutdown_codec.decode_response)
[docs] def shutdown(self) -> Future[None]: """Initiates a shutdown process which works orderly. Tasks that were submitted before shutdown are executed but new task will not be accepted. """ request = executor_service_shutdown_codec.encode_request(self.name) return self._invoke(request)
def _execute_on_member(self, uuid, task_data, member_uuid): def handler(message): return self._to_object(executor_service_submit_to_member_codec.decode_response(message)) request = executor_service_submit_to_member_codec.encode_request( self.name, uuid, task_data, member_uuid ) return self._invoke_on_target(request, member_uuid, handler)
[docs] def blocking(self) -> "BlockingExecutor": return BlockingExecutor(self)
[docs]class BlockingExecutor(Executor): __slots__ = ("_wrapped", "name", "service_name") def __init__(self, wrapped: Executor): self.name = wrapped.name self.service_name = wrapped.service_name self._wrapped = wrapped
[docs] def execute_on_key_owner( # type: ignore[override] self, key: typing.Any, task: typing.Any, ) -> typing.Any: return self._wrapped.execute_on_key_owner(key, task).result()
[docs] def execute_on_member( # type: ignore[override] self, member: MemberInfo, task: typing.Any, ) -> typing.Any: return self._wrapped.execute_on_member(member, task).result()
[docs] def execute_on_members( # type: ignore[override] self, members: typing.Sequence[MemberInfo], task: typing.Any, ) -> typing.List[typing.Any]: return self._wrapped.execute_on_members(members, task).result()
[docs] def execute_on_all_members( # type: ignore[override] self, task: typing.Any, ) -> typing.List[typing.Any]: return self._wrapped.execute_on_all_members(task).result()
[docs] def is_shutdown( # type: ignore[override] self, ) -> bool: return self._wrapped.is_shutdown().result()
[docs] def shutdown( # type: ignore[override] self, ) -> None: return self._wrapped.shutdown().result()
[docs] def blocking(self) -> "BlockingExecutor": return self
[docs] def destroy(self) -> bool: return self._wrapped.destroy()
def __repr__(self) -> str: return self._wrapped.__repr__()