import asyncio
import typing
from uuid import uuid4
from hazelcast.core import MemberInfo
from hazelcast.protocol.codec import (
executor_service_shutdown_codec,
executor_service_is_shutdown_codec,
executor_service_submit_to_partition_codec,
executor_service_submit_to_member_codec,
)
from hazelcast.internal.asyncio_proxy.base import Proxy
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_not_none
[docs]
class Executor(Proxy):
"""An object that executes submitted executable tasks."""
[docs]
async def execute_on_key_owner(self, key: typing.Any, task: typing.Any) -> typing.Any:
"""Executes a task on the owner of the specified key.
Args:
key: The specified key.
task: A task executed on the owner of the specified key.
Returns:
The result of the task.
"""
check_not_none(key, "key can't be None")
check_not_none(task, "task can't be None")
try:
key_data = self._to_data(key)
task_data = self._to_data(task)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.execute_on_key_owner, key, task)
partition_id = self._partition_service.get_partition_id(key_data)
uuid = uuid4()
def handler(message):
return self._to_object(
executor_service_submit_to_partition_codec.decode_response(message)
)
request = executor_service_submit_to_partition_codec.encode_request(
self.name, uuid, task_data
)
return await self._ainvoke_on_partition(request, partition_id, handler)
[docs]
async def execute_on_member(self, member: MemberInfo, task: typing.Any) -> typing.Any:
"""Executes a task on the specified member.
Args:
member: The specified member.
task: The task executed on the specified member.
Returns:
The result of the task.
"""
check_not_none(task, "task can't be None")
try:
task_data = self._to_data(task)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.execute_on_member, member, task)
uuid = uuid4()
return await self._execute_on_member(uuid, task_data, member.uuid)
[docs]
async def execute_on_members(
self, members: typing.Sequence[MemberInfo], task: typing.Any
) -> typing.List[typing.Any]:
"""Executes a task on each of the specified members.
Args:
members: The specified members.
task: The task executed on the specified members.
Returns:
The list of results of the tasks on each member.
"""
try:
task_data = self._to_data(task)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.execute_on_members, members, task)
tasks = []
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
tasks = [
tg.create_task(self._execute_on_member(uuid4(), task_data, member.uuid))
for member in members
]
return [task.result() for task in tasks]
[docs]
async def execute_on_all_members(self, task: typing.Any) -> typing.List[typing.Any]:
"""Executes a task on all the known cluster members.
Args:
task: The task executed on the all the members.
Returns:
The list of results of the tasks on each member.
"""
return await self.execute_on_members(self._context.cluster_service.get_members(), task)
[docs]
async def is_shutdown(self) -> bool:
"""Determines whether this executor has been shutdown or not.
Returns:
``True`` if the executor has been shutdown, ``False`` otherwise.
"""
request = executor_service_is_shutdown_codec.encode_request(self.name)
return await self._invoke(request, executor_service_is_shutdown_codec.decode_response)
[docs]
async def shutdown(self) -> None:
"""Initiates a shutdown process which works orderly. Tasks that were
submitted before shutdown are executed but new task will not be
accepted.
"""
request = executor_service_shutdown_codec.encode_request(self.name)
return await self._invoke(request)
async def _execute_on_member(self, uuid, task_data, member_uuid) -> typing.Any:
def handler(message):
return self._to_object(executor_service_submit_to_member_codec.decode_response(message))
request = executor_service_submit_to_member_codec.encode_request(
self.name, uuid, task_data, member_uuid
)
return await self._ainvoke_on_target(request, member_uuid, handler)
async def create_executor_proxy(service_name, name, context):
return Executor(service_name, name, context)