import abc
import typing
import uuid
from hazelcast.core import MemberInfo
from hazelcast.types import KeyType, ValueType, ItemType, MessageType, BlockingProxyType
from hazelcast.invocation import Invocation
from hazelcast.partition import string_partition_strategy
from hazelcast.util import get_attr_name
MAX_SIZE = float("inf")
def _no_op_response_handler(_):
return None
[docs]class Proxy(typing.Generic[BlockingProxyType], abc.ABC):
"""Provides basic functionality for Hazelcast Proxies."""
def __init__(self, service_name: str, name: str, context):
self.service_name = service_name
self.name = name
self._context = context
self._invocation_service = context.invocation_service
self._partition_service = context.partition_service
serialization_service = context.serialization_service
self._to_object = serialization_service.to_object
self._to_data = serialization_service.to_data
listener_service = context.listener_service
self._register_listener = listener_service.register_listener
self._deregister_listener = listener_service.deregister_listener
self._is_smart = context.config.smart_routing
self._send_schema_and_retry = context.compact_schema_service.send_schema_and_retry
[docs] def destroy(self) -> bool:
"""Destroys this proxy.
Returns:
``True`` if this proxy is destroyed successfully, ``False``
otherwise.
"""
self._on_destroy()
return self._context.proxy_manager.destroy_proxy(self.service_name, self.name)
def _on_destroy(self):
pass
def __repr__(self) -> str:
return '%s(name="%s")' % (type(self).__name__, self.name)
def _invoke(self, request, response_handler=_no_op_response_handler):
invocation = Invocation(request, response_handler=response_handler)
self._invocation_service.invoke(invocation)
return invocation.future
def _invoke_on_target(self, request, uuid, response_handler=_no_op_response_handler):
invocation = Invocation(request, uuid=uuid, response_handler=response_handler)
self._invocation_service.invoke(invocation)
return invocation.future
def _invoke_on_key(self, request, key_data, response_handler=_no_op_response_handler):
partition_id = self._partition_service.get_partition_id(key_data)
invocation = Invocation(
request, partition_id=partition_id, response_handler=response_handler
)
self._invocation_service.invoke(invocation)
return invocation.future
def _invoke_on_partition(self, request, partition_id, response_handler=_no_op_response_handler):
invocation = Invocation(
request, partition_id=partition_id, response_handler=response_handler
)
self._invocation_service.invoke(invocation)
return invocation.future
[docs] @abc.abstractmethod
def blocking(self) -> BlockingProxyType:
"""Returns a version of this proxy with only blocking method calls."""
pass
[docs]class PartitionSpecificProxy(Proxy[BlockingProxyType], abc.ABC):
"""Provides basic functionality for Partition Specific Proxies."""
def __init__(self, service_name, name, context):
super(PartitionSpecificProxy, self).__init__(service_name, name, context)
partition_key = context.serialization_service.to_data(string_partition_strategy(self.name))
self._partition_id = context.partition_service.get_partition_id(partition_key)
def _invoke(self, request, response_handler=_no_op_response_handler):
invocation = Invocation(
request, partition_id=self._partition_id, response_handler=response_handler
)
self._invocation_service.invoke(invocation)
return invocation.future
[docs]class TransactionalProxy:
"""Provides an interface for all transactional distributed objects."""
def __init__(self, name, transaction, context):
self.name = name
self.transaction = transaction
self._invocation_service = context.invocation_service
serialization_service = context.serialization_service
self._to_object = serialization_service.to_object
self._to_data = serialization_service.to_data
self._send_schema_and_retry = context.compact_schema_service.send_schema_and_retry
def _send_schema(self, error):
return self._send_schema_and_retry(error, lambda: None).result()
def _invoke(self, request, response_handler=_no_op_response_handler):
invocation = Invocation(
request, connection=self.transaction.connection, response_handler=response_handler
)
self._invocation_service.invoke(invocation)
return invocation.future.result()
def __repr__(self):
return '%s(name="%s")' % (type(self).__name__, self.name)
[docs]class ItemEventType:
"""Type of item events."""
ADDED = 1
"""
Fired when an item is added.
"""
REMOVED = 2
"""
Fired when an item is removed.
"""
[docs]class EntryEventType:
"""Type of entry event."""
ADDED = 1
"""
Fired if an entry is added.
"""
REMOVED = 2
"""
Fired if an entry is removed.
"""
UPDATED = 4
"""
Fired if an entry is updated.
"""
EVICTED = 8
"""
Fired if an entry is evicted.
"""
EXPIRED = 16
"""
Fired if an entry is expired.
"""
EVICT_ALL = 32
"""
Fired if all entries are evicted.
"""
CLEAR_ALL = 64
"""
Fired if all entries are cleared.
"""
MERGED = 128
"""
Fired if an entry is merged after a network partition.
"""
INVALIDATION = 256
"""
Fired if an entry is invalidated.
"""
LOADED = 512
"""
Fired if an entry is loaded.
"""
[docs]class ItemEvent(typing.Generic[ItemType]):
"""Map Item event.
Attributes:
name: Name of the proxy that fired the event.
item: The item related to the event.
event_type: Type of the event.
member: Member that fired the event.
"""
def __init__(self, name: str, item: ItemEventType, event_type: int, member: MemberInfo):
self.name = name
self.item = item
self.event_type = event_type
self.member = member
[docs]class EntryEvent(typing.Generic[KeyType, ValueType]):
"""Map Entry event.
Attributes:
event_type: Type of the event.
uuid: UUID of the member that fired the event.
number_of_affected_entries: Number of affected entries by this event.
key: The key of this entry event.
value: The value of the entry event.
old_value: The old value of the entry event.
merging_value: The incoming merging value of the entry event.
"""
def __init__(
self,
key: KeyType,
value: ValueType,
old_value: ValueType,
merging_value: ValueType,
event_type: int,
member_uuid: uuid.UUID,
number_of_affected_entries: int,
):
self.key = key
self.value = value
self.old_value = old_value
self.merging_value = merging_value
self.event_type = event_type
self.uuid = member_uuid
self.number_of_affected_entries = number_of_affected_entries
def __repr__(self):
return (
"EntryEvent(key=%s, value=%s, old_value=%s, merging_value=%s, event_type=%s, uuid=%s, "
"number_of_affected_entries=%s)"
% (
self.key,
self.value,
self.old_value,
self.merging_value,
get_attr_name(EntryEventType, self.event_type),
self.uuid,
self.number_of_affected_entries,
)
)
[docs]class TopicMessage(typing.Generic[MessageType]):
"""Topic message.
Attributes:
name: Name of the proxy that fired the event.
message: The message sent to Topic.
publish_time: UNIX time that the event is published as seconds.
member: Member that fired the event.
"""
__slots__ = ("name", "message", "publish_time", "member")
def __init__(self, name: str, message: MessageType, publish_time: int, member: MemberInfo):
self.name = name
self.message = message
self.publish_time = publish_time
self.member = member
def __repr__(self):
return "TopicMessage(message=%s, publish_time=%s, topic_name=%s, publishing_member=%s)" % (
self.message,
self.publish_time,
self.name,
self.member,
)
def get_entry_listener_flags(**kwargs):
flags = 0
for key, value in kwargs.items():
if value:
flags |= getattr(EntryEventType, key)
return flags