Asyncio HazelcastClient API Documentation

class AtomicLong(context, group_id, service_name, proxy_name, object_name)[source]

Bases: BaseCPProxy

AtomicLong is a redundant and highly available distributed counter for 64-bit integers (long type in Java).

It works on top of the Raft consensus algorithm. It offers linearizability during crash failures and network partitions. It is CP with respect to the CAP principle. If a network partition occurs, it remains available on at most one side of the partition.

AtomicLong implementation does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures. It can be tuned to offer at-most-once execution semantics. Please see fail-on-indeterminate-operation-state server-side setting.

async add_and_get(delta: int) int[source]

Atomically adds the given value to the current value.

Parameters:

delta – The value to add to the current value.

Returns:

The updated value, the given value added to the current value.

async compare_and_set(expect: int, update: int) bool[source]

Atomically sets the value to the given updated value only if the current value equals the expected value.

Parameters:
  • expect – The expected value.

  • update – The new value.

Returns:

True if successful; or False if the actual value was not equal to the expected value.

async decrement_and_get() int[source]

Atomically decrements the current value by one.

Returns:

The updated value, the current value decremented by one.

async get_and_decrement() int[source]

Atomically decrements the current value by one.

Returns:

The old value.

async get() int[source]

Gets the current value.

Returns:

The current value.

async get_and_add(delta: int) int[source]

Atomically adds the given value to the current value.

Parameters:

delta – The value to add to the current value.

Returns:

The old value before the add.

async get_and_set(new_value: int) int[source]

Atomically sets the given value and returns the old value.

Parameters:

new_value – The new value.

Returns:

The old value.

async increment_and_get() int[source]

Atomically increments the current value by one.

Returns:

The updated value, the current value incremented by one.

async get_and_increment() int[source]

Atomically increments the current value by one.

Returns:

The old value.

async set(new_value: int) None[source]

Atomically sets the given value.

Parameters:

new_value – The new value

async alter(function: Any) None[source]

Alters the currently stored value by applying a function on it.

Notes

function must be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements the com.hazelcast.core.IFunction interface with the actual logic of the function to be applied.

Parameters:

function – The function that alters the currently stored value.

async alter_and_get(function: Any) int[source]

Alters the currently stored value by applying a function on it and gets the result.

Notes

function must be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements the com.hazelcast.core.IFunction interface with the actual logic of the function to be applied.

Parameters:

function – The function that alters the currently stored value.

Returns:

The new value.

async get_and_alter(function: Any) int[source]

Alters the currently stored value by applying a function on it and gets the old value.

Notes

function must be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements the com.hazelcast.core.IFunction interface with the actual logic of the function to be applied.

Parameters:

function – The function that alters the currently stored value.

Returns:

The old value.

async apply(function: Any) Any[source]

Applies a function on the value, the actual stored value will not change.

Notes

function must be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements the com.hazelcast.core.IFunction interface with the actual logic of the function to be applied.

Parameters:

function – The function applied to the currently stored value.

Returns:

The result of the function application.

class AtomicReference(context, group_id, service_name, proxy_name, object_name)[source]

Bases: BaseCPProxy, Generic[ElementType]

A distributed, highly available object reference with atomic operations.

AtomicReference offers linearizability during crash failures and network partitions. It is CP with respect to the CAP principle. If a network partition occurs, it remains available on at most one side of the partition.

The following are some considerations you need to know when you use AtomicReference:

  • AtomicReference works based on the byte-content and not on the object-reference. If you use the compare_and_set() method, do not change the original value because its serialized content will then be different.

  • All methods returning an object return a private copy. You can modify the private copy, but the rest of the world is shielded from your changes. If you want these changes to be visible to the rest of the world, you need to write the change back to the AtomicReference; but be careful about introducing a data-race.

  • The in-memory format of an AtomicReference is binary. The receiving side does not need to have the class definition available unless it needs to be deserialized on the other side., e.g., because a method like alter() is executed. This deserialization is done for every call that needs to have the object instead of the binary content, so be careful with expensive object graphs that need to be deserialized.

  • If you have an object with many fields or an object graph, and you only need to calculate some information or need a subset of fields, you can use the apply() method. With the apply() method, the whole object does not need to be sent over the line; only the information that is relevant is sent.

IAtomicReference does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures. It can be tuned to offer at-most-once execution semantics. Please see fail-on-indeterminate-operation-state server-side setting.

async compare_and_set(expect: ElementType | None, update: ElementType | None) bool[source]

Atomically sets the value to the given updated value only if the current value is equal to the expected value.

Parameters:
  • expect – The expected value.

  • update – The new value.

Returns:

True if successful, or False if the actual value was not equal to the expected value.

async get() ElementType | None[source]

Gets the current value.

Returns:

The current value.

async set(new_value: ElementType | None) None[source]

Atomically sets the given value.

Parameters:

new_value – The new value.

async get_and_set(new_value: ElementType | None) ElementType | None[source]

Gets the old value and sets the new value.

Parameters:

new_value – The new value.

Returns:

The old value.

async is_none() bool[source]

Checks if the stored reference is None.

Returns:

True if the stored reference is None, False otherwise.

async clear() None[source]

Clears the current stored reference, so it becomes None.

async contains(value: ElementType | None) bool[source]

Checks if the reference contains the value.

Parameters:

value – The value to check (is allowed to be None).

Returns:

True if the value is found, False otherwise.

async alter(function: Any) None[source]

Alters the currently stored reference by applying a function on it.

Notes

function must be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements the com.hazelcast.core.IFunction interface with the actual logic of the function to be applied.

Parameters:

function – The function that alters the currently stored reference.

async alter_and_get(function: Any) ElementType | None[source]

Alters the currently stored reference by applying a function on it and gets the result.

Notes

function must be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements the com.hazelcast.core.IFunction interface with the actual logic of the function to be applied.

Parameters:

function – The function that alters the currently stored reference.

Returns:

The new value, the result of the applied function.

async get_and_alter(function: Any) ElementType | None[source]

Alters the currently stored reference by applying a function on it on and gets the old value.

Notes

function must be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements the com.hazelcast.core.IFunction interface with the actual logic of the function to be applied.

Parameters:

function – The function that alters the currently stored reference.

Returns:

The old value, the value before the function is applied.

async apply(function: Any) ElementType | None[source]

Applies a function on the value, the actual stored value will not change.

Notes

function must be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements the com.hazelcast.core.IFunction interface with the actual logic of the function to be applied.

Parameters:

function – The function applied on the currently stored reference.

Returns:

The result of the function application.

class CPSubsystem(context)[source]

Bases: object

CP Subsystem is a component of Hazelcast that builds a strongly consistent layer for a set of distributed data structures.

Its APIs can be used for implementing distributed coordination use cases, such as leader election, distributed locking, synchronization, and metadata management.

Its data structures are CP with respect to the CAP principle, i.e., they always maintain linearizability and prefer consistency to availability during network partitions. Besides network partitions, CP Subsystem withstands server and client failures.

Data structures in CP Subsystem run in CP groups. Each CP group elects its own Raft leader and runs the Raft consensus algorithm independently.

The CP data structures differ from the other Hazelcast data structures in two aspects. First, an internal commit is performed on the METADATA CP group every time you fetch a proxy from this interface. Hence, callers should cache returned proxy objects. Second, if you call destroy() on a CP data structure proxy, that data structure is terminated on the underlying CP group and cannot be reinitialized until the CP group is force-destroyed. For this reason, please make sure that you are completely done with a CP data structure before destroying its proxy.

async get_atomic_long(name: str) AtomicLong[source]

Returns the distributed AtomicLong instance with given name.

The instance is created on CP Subsystem.

If no group name is given within the name argument, then the AtomicLong instance will be created on the default CP group. If a group name is given, like .get_atomic_long("myLong@group1"), the given group will be initialized first, if not initialized already, and then the instance will be created on this group.

Parameters:

name – Name of the AtomicLong.

Returns:

The AtomicLong proxy for the given name.

async get_atomic_reference(name: str) AtomicReference[source]

Returns the distributed AtomicReference instance with given name.

The instance is created on CP Subsystem.

If no group name is given within the name argument, then the AtomicReference instance will be created on the DEFAULT CP group. If a group name is given, like .get_atomic_reference("myRef@group1"), the given group will be initialized first, if not initialized already, and then the instance will be created on this group.

Parameters:

name – Name of the AtomicReference.

Returns:

The AtomicReference proxy for the given name.

async get_count_down_latch(name: str) CountDownLatch[source]

Returns the distributed CountDownLatch instance with given name.

The instance is created on CP Subsystem.

If no group name is given within the name argument, then the CountDownLatch instance will be created on the DEFAULT CP group. If a group name is given, like .get_count_down_latch("myLatch@group1"), the given group will be initialized first, if not initialized already, and then the instance will be created on this group.

Parameters:

name – Name of the CountDownLatch.

Returns:

The CountDownLatch proxy for the given name.

async get_semaphore(name: str) Semaphore[source]

Returns the distributed Semaphore instance with given name.

The instance is created on CP Subsystem.

If no group name is given within the name argument, then the Semaphore instance will be created on the DEFAULT CP group. If a group name is given, like .get_semaphore("mySemaphore@group1"), the given group will be initialized first, if not initialized already, and then the instance will be created on this group.

Parameters:

name – Name of the Semaphore

Returns:

The Semaphore proxy for the given name.

class CountDownLatch(context, group_id, service_name, proxy_name, object_name)[source]

Bases: BaseCPProxy

A distributed, concurrent countdown latch data structure.

CountDownLatch is a cluster-wide synchronization aid that allows one or more callers to wait until a set of operations being performed in other callers completes.

CountDownLatch count can be reset using try_set_count() method after a countdown has finished but not during an active count. This allows the same latch instance to be reused.

There is no await_latch() method to wait indefinitely since this is undesirable in a distributed application: for example, a cluster can split or the master and replicas could all terminate. In most cases, it is best to configure an explicit timeout, so you have the ability to deal with these situations.

All the API methods in the CountDownLatch offer the exactly-once execution semantics. For instance, even if a count_down() call is internally retried because of crashed Hazelcast member, the counter value is decremented only once.

async await_latch(timeout: float) bool[source]

Causes the current thread to wait until the latch has counted down to zero, or an exception is thrown, or the specified waiting time elapses.

If the current count is zero then this method returns True.

If the current count is greater than zero, then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of the following things happen:

  • The count reaches zero due to invocations of the count_down() method

  • This CountDownLatch instance is destroyed

  • The countdown owner becomes disconnected

  • The specified waiting time elapses

If the count reaches zero, then the method returns with the value True.

If the specified waiting time elapses then the value False is returned. If the time is less than or equal to zero, the method will not wait at all.

Parameters:

timeout – The maximum time to wait in seconds

Returns:

True if the count reached zero, False if the waiting time elapsed before the count reached zero

Raises:

IllegalStateError – If the Hazelcast instance was shut down while waiting.

async count_down() None[source]

Decrements the count of the latch, releasing all waiting threads if the count reaches zero.

If the current count is greater than zero, then it is decremented. If the new count is zero:

  • All waiting threads are re-enabled for thread scheduling purposes

  • Countdown owner is set to None.

If the current count equals zero, then nothing happens.

async get_count() int[source]

Returns the current count.

Returns:

The current count.

async try_set_count(count: int) bool[source]

Sets the count to the given value if the current count is zero.

If count is not zero, then this method does nothing and returns False.

Parameters:

count – The number of times count_down() must be invoked before callers can pass through await_latch().

Returns:

True if the new count was set, False if the current count is not zero.

class FlakeIdGenerator(service_name, name, context)[source]

Bases: Proxy

A cluster-wide unique ID generator. Generated IDs are int values and are k-ordered (roughly ordered). IDs are in the range from 0 to 2^63 - 1.

The IDs contain a timestamp component and a node ID component, which is assigned when the member joins the cluster. This allows the IDs to be ordered and unique without any coordination between members, which makes the generator safe even in split-brain scenario.

Timestamp component is in milliseconds since 1.1.2018, 0:00 UTC and has 41 bits. This caps the useful lifespan of the generator to little less than 70 years (until ~2088). The sequence component is 6 bits. If more than 64 IDs are requested in single millisecond, IDs will gracefully overflow to the next millisecond and uniqueness is guaranteed in this case. The implementation does not allow overflowing by more than 15 seconds, if IDs are requested at higher rate, the call will block. Note, however, that clients are able to generate even faster because each call goes to a different (random) member and the 64 IDs/ms limit is for single member.

Node ID overflow:

It is possible to generate IDs on any member or client as long as there is at least one member with join version smaller than 2^16 in the cluster. The remedy is to restart the cluster: nodeId will be assigned from zero again. Uniqueness after the restart will be preserved thanks to the timestamp component.

Warning

Asyncio client FlakeIdGenerator proxy is not thread-safe, do not access it from other threads.

async new_id() int[source]

Generates and returns a cluster-wide unique ID.

This method goes to a random member and gets a batch of IDs, which will then be returned locally for a limited time. The pre-fetch size and the validity time can be configured.

Note

Values returned from this method may not be strictly ordered.

Returns:

New cluster-wide unique ID.

Raises:

HazelcastError – If node ID for all members in the cluster is out of valid range. See Node ID overflow note above.

class Executor(service_name: str, name: str, context)[source]

Bases: Proxy

An object that executes submitted executable tasks.

async execute_on_key_owner(key: Any, task: Any) Any[source]

Executes a task on the owner of the specified key.

Parameters:
  • key – The specified key.

  • task – A task executed on the owner of the specified key.

Returns:

The result of the task.

async execute_on_member(member: MemberInfo, task: Any) Any[source]

Executes a task on the specified member.

Parameters:
  • member – The specified member.

  • task – The task executed on the specified member.

Returns:

The result of the task.

async execute_on_members(members: Sequence[MemberInfo], task: Any) List[Any][source]

Executes a task on each of the specified members.

Parameters:
  • members – The specified members.

  • task – The task executed on the specified members.

Returns:

The list of results of the tasks on each member.

async execute_on_all_members(task: Any) List[Any][source]

Executes a task on all the known cluster members.

Parameters:

task – The task executed on the all the members.

Returns:

The list of results of the tasks on each member.

async is_shutdown() bool[source]

Determines whether this executor has been shutdown or not.

Returns:

True if the executor has been shutdown, False otherwise.

async shutdown() None[source]

Initiates a shutdown process which works orderly. Tasks that were submitted before shutdown are executed but new task will not be accepted.

class HazelcastClient(config: Config | None = None, **kwargs)[source]

Bases: object

Hazelcast client instance to access and manipulate distributed data structures on the Hazelcast clusters.

The client can be configured either by:

  • providing a configuration object as the first parameter of the constructor

from hazelcast.asyncio import HazelcastClient
from hazelcast.config import Config

config = Config()
config.cluster_name = "a-cluster"
client = await HazelcastClient.create_and_start(config)
  • passing configuration options as keyword arguments

from hazelcast.asyncio import HazelcastClient

client = await HazelcastClient.create_and_start(
    cluster_name="a-cluster",
)

Warning

Asyncio client is not thread-safe, do not access it from other threads.

See the hazelcast.config.Config documentation for the possible configuration options.

Creates a HazelcastClient instance.

This call just creates the instance, without starting it.

The preferred way of creating and starting the client instance is using the create_and_start method:

from hazelcast.asyncio import HazelcastClient

client = await HazelcastClient.create_and_start()

See the hazelcast.config.Config documentation for the possible configuration options.

Parameters:
  • config – Optional configuration object.

  • **kwargs – Optional keyword arguments of the client configuration.

async classmethod create_and_start(config: Config | None = None, **kwargs) HazelcastClient[source]

Creates a HazelcastClient instance, and starts it.

from hazelcast.asyncio import HazelcastClient

client = await HazelcastClient.create_and_start()

See the hazelcast.config.Config documentation for the possible configuration options.

Parameters:
  • config – Optional configuration object.

  • **kwargs – Optional keyword arguments of the client configuration.

async get_executor(name: str) Executor[source]

Returns the executor instance with the specified name.

Parameters:

name – Name of the executor.

Returns:

Executor instance with the specified name.

async get_list(name: str) List[KeyType][source]

Returns the distributed list instance with the specified name.

Parameters:

name – Name of the distributed list.

Returns:

Distributed list instance with the specified name.

async get_map(name: str) Map[KeyType, ValueType][source]

Returns the distributed map instance with the specified name.

Parameters:

name – Name of the distributed map.

Returns:

Distributed map instance with the specified name.

async get_multi_map(name: str) MultiMap[KeyType, ValueType][source]

Returns the distributed MultiMap instance with the specified name.

Parameters:

name – Name of the distributed MultiMap.

Returns:

Distributed MultiMap instance with the specified name.

async get_queue(name: str) Queue[KeyType][source]

Returns the distributed queue instance with the specified name.

Parameters:

name – Name of the distributed queue.

Returns:

Distributed queue instance with the specified name.

async get_set(name: str) Set[KeyType][source]

Returns the distributed set instance with the specified name.

Parameters:

name – Name of the distributed set.

Returns:

Distributed set instance with the specified name.

async get_replicated_map(name: str) ReplicatedMap[KeyType, ValueType][source]

Returns the distributed ReplicatedMap instance with the specified name.

Parameters:

name – Name of the distributed replicated map.

Returns:

Distributed ReplicatedMap instance with the specified name.

async get_flake_id_generator(name: str) FlakeIdGenerator[source]

Returns the FlakeIdGenerator instance with the specified name.

Parameters:

name – Name of the FlakeIdGenerator.

Returns:

FlakeIdGenerator instance with the specified name.

async get_reliable_topic(name: str) ReliableTopic[source]

Returns the ReliableTopic instance with the specified name.

Parameters:

name – Name of the ReliableTopic.

Returns:

Distributed ReliableTopic instance with the specified name.

async get_ringbuffer(name: str) Ringbuffer[ItemType][source]

Returns the distributed Ringbuffer instance with the specified name.

Parameters:

name – Name of the distributed ringbuffer.

Returns:

Distributed Ringbuffer instance with the specified name.

async get_pn_counter(name: str) PNCounter[source]

Returns the PN Counter instance with the specified name.

Parameters:

name – Name of the PN Counter.

Returns:

Distributed PN Counter instance with the specified name.

async get_topic(name: str) Topic[MessageType][source]

Returns the distributed topic instance with the specified name.

Parameters:

name – Name of the distributed topic.

Returns:

Distributed topic instance with the specified name.

async create_vector_collection_config(name: str, indexes: List[IndexConfig], backup_count: int = 1, async_backup_count: int = 0, split_brain_protection_name: str | None = None, merge_policy: str = 'PutIfAbsentMergePolicy', merge_batch_size: int = 100) None[source]

Creates a vector collection with the given configuration.

Parameters:
async get_vector_collection(name: str) VectorCollection[source]

Returns the vector collection instance with the specified name.

Parameters:

name – Name of the vector collection.

Returns:

Vector collection instance with the specified name.

async add_distributed_object_listener(listener_func: Callable[[DistributedObjectEvent], None]) str[source]

Adds a listener which will be notified when a new distributed object is created or destroyed.

Parameters:

listener_func – Function to be called when a distributed object is created or destroyed.

Returns:

A registration id which is used as a key to remove the listener.

async remove_distributed_object_listener(registration_id: str) bool[source]

Removes the specified distributed object listener.

Returns silently if there is no such listener added before.

Parameters:

registration_id – The id of the registered listener.

Returns:

True if registration is removed, False otherwise.

async shutdown() None[source]

Shuts down this HazelcastClient.

property name: str

Name of the client.

property lifecycle_service: LifecycleService

Lifecycle service allows you to check if the client is running and add and remove lifecycle listeners.

property partition_service: PartitionService

Partition service allows you to get partition count, introspect the partition owners, and partition ids of keys.

property cluster_service: ClusterService

Cluster service allows you to get the list of the cluster members and add and remove membership listeners.

Type:

ClusterService

property sql: SqlService

Returns a service to execute distributed SQL queries.

property cp_subsystem: CPSubsystem

CP Subsystem offers set of in-memory linearizable data structures.

class List(service_name, name, context)[source]

Bases: PartitionSpecificProxy, Generic[ItemType]

Concurrent, distributed implementation of List.

The Hazelcast List is not a partitioned data-structure. So all the content of the List is stored in a single machine (and in the backup). So the List will not scale by adding more members in the cluster.

Example

>>> my_list = await client.get_list("my_list")
>>> print("list.add", await my_list.add("item"))
>>> print("list.size", await my_list.size())

Warning

Asyncio client list proxy is not thread-safe, do not access it from other threads.

async add(item: ItemType) bool[source]

Adds the specified item to the end of this list.

Parameters:

item – the specified item to be appended to this list.

Returns:

True if item is added, False otherwise.

async add_at(index: int, item: ItemType) None[source]

Adds the specified item at the specific position in this list. Element in this position and following elements are shifted to the right, if any.

Parameters:
  • index – The specified index to insert the item.

  • item – The specified item to be inserted.

async add_all(items: Sequence[ItemType]) bool[source]

Adds all of the items in the specified collection to the end of this list.

The order of new elements is determined by the specified collection’s iterator.

Parameters:

items – The specified collection which includes the elements to be added to list.

Returns:

True if this call changed the list, False otherwise.

async add_all_at(index: int, items: Sequence[ItemType]) bool[source]

Adds all of the elements in the specified collection into this list at the specified position.

Elements in this positions and following elements are shifted to the right, if any. The order of new elements is determined by the specified collection’s iterator.

Parameters:
  • index – The specified index at which the first element of specified collection is added.

  • items – The specified collection which includes the elements to be added to list.

Returns:

True if this call changed the list, False otherwise.

async add_listener(include_value: bool = False, item_added_func: Callable[[ItemEvent[ItemType]], None] = None, item_removed_func: Callable[[ItemEvent[ItemType]], None] = None) str[source]

Adds an item listener for this list. Listener will be notified for all list add/remove events.

Parameters:
  • include_value – Whether received events include the updated item or not.

  • item_added_func – To be called when an item is added to this list.

  • item_removed_func – To be called when an item is deleted from this list.

Returns:

A registration id which is used as a key to remove the listener.

async clear() None[source]

Clears the list.

List will be empty with this call.

async contains(item: ItemType) bool[source]

Determines whether this list contains the specified item or not.

Parameters:

item – The specified item.

Returns:

True if the specified item exists in this list, False otherwise.

async contains_all(items: Sequence[ItemType]) bool[source]

Determines whether this list contains all of the items in specified collection or not.

Parameters:

items – The specified collection which includes the items to be searched.

Returns:

True if all of the items in specified collection exist in this list, False otherwise.

async get(index: int) ItemType[source]

Returns the item which is in the specified position in this list.

Parameters:

index – the specified index of the item to be returned.

Returns:

The item in the specified position in this list.

async get_all() List[ItemType][source]

Returns all the items in this list.

Returns:

All the items in this list.

async iterator() List[ItemType][source]

Returns an iterator over the elements in this list in proper sequence, same with get_all.

Returns:

All the items in this list.

async index_of(item: ItemType) int[source]

Returns the first index of specified item’s occurrences in this list.

If specified item is not present in this list, returns -1.

Parameters:

item – The specified item to be searched for.

Returns:

The first index of specified item’s occurrences, -1 if item is not present in this list.

async is_empty() bool[source]

Determines whether this list is empty or not.

Returns:

True if the list contains no elements, False otherwise.

async last_index_of(item: ItemType) int[source]

Returns the last index of specified item’s occurrences in this list.

If specified item is not present in this list, returns -1.

Parameters:

item – The specified item to be searched for.

Returns:

The last index of specified item’s occurrences, -1 if item is not present in this list.

async list_iterator(index: int = 0) List[ItemType][source]

Returns a list iterator of the elements in this list.

If an index is provided, iterator starts from this index.

Parameters:

index – Index of first element to be returned from the list iterator.

Returns:

List of the elements in this list.

async remove(item: ItemType) bool[source]

Removes the specified element’s first occurrence from the list if it exists in this list.

Parameters:

item – The specified element.

Returns:

True if the specified element is present in this list, False otherwise.

async remove_at(index: int) ItemType[source]

Removes the item at the specified position in this list.

Element in this position and following elements are shifted to the left, if any.

Parameters:

index – Index of the item to be removed.

Returns:

The item previously at the specified index.

async remove_all(items: Sequence[ItemType]) bool[source]

Removes all of the elements that is present in the specified collection from this list.

Parameters:

items – The specified collection.

Returns:

True if this list changed as a result of the call, False otherwise.

async remove_listener(registration_id: str) bool[source]

Removes the specified item listener.

Returns silently if the specified listener was not added before.

Parameters:

registration_id – Id of the listener to be deleted.

Returns:

True if the item listener is removed, False otherwise.

async retain_all(items: Sequence[ItemType]) bool[source]

Retains only the items that are contained in the specified collection.

It means, items which are not present in the specified collection are removed from this list.

Parameters:

items – Collections which includes the elements to be retained in this list.

Returns:

True if this list changed as a result of the call, False otherwise.

async size() int[source]

Returns the number of elements in this list.

Returns:

Number of elements in this list.

async set_at(index: int, item: ItemType) ItemType[source]

Replaces the specified element with the element at the specified position in this list.

Parameters:
  • index – Index of the item to be replaced.

  • item – Item to be stored.

Returns:

The previous item in the specified index.

async sub_list(from_index: int, to_index: int) List[ItemType][source]

Returns a sublist from this list, from from_index(inclusive) to to_index(exclusive).

The returned list is backed by this list, so non-structural changes in the returned list are reflected in this list, and vice-versa.

Parameters:
  • from_index – The start point(inclusive) of the sub_list.

  • to_index – The end point(exclusive) of the sub_list.

Returns:

A view of the specified range within this list.

class Map(service_name, name, context)[source]

Bases: Proxy, Generic[KeyType, ValueType]

Hazelcast Map client proxy to access the map on the cluster.

Concurrent, distributed, observable and queryable map.

Example

>>> my_map = await client.get_map("my_map")
>>> print("map.put", await my_map.put("key", "value"))
>>> print("map.contains_key", await my_map.contains_key("key"))
>>> print("map.get", await my_map.get("key"))
>>> print("map.size", await my_map.size())

This class does not allow None to be used as a key or value.

Warning

Asyncio client map proxy is not thread-safe, do not access it from other threads.

async add_entry_listener(include_value: bool = False, key: KeyType = None, predicate: Predicate = None, added_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, removed_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, updated_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, evicted_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, evict_all_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, clear_all_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, merged_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, expired_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, loaded_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None) str[source]

Adds a continuous entry listener for this map.

Listener will get notified for map events filtered with given parameters.

The listener functions must not block.

Parameters:
  • include_value – Whether received event should include the value or not.

  • key – Key for filtering the events.

  • predicate – Predicate for filtering the events.

  • added_func – Function to be called when an entry is added to map.

  • removed_func – Function to be called when an entry is removed from map.

  • updated_func – Function to be called when an entry is updated.

  • evicted_func – Function to be called when an entry is evicted from map.

  • evict_all_func – Function to be called when entries are evicted from map.

  • clear_all_func – Function to be called when entries are cleared from map.

  • merged_func – Function to be called when WAN replicated entry is merged.

  • expired_func – Function to be called when an entry’s live time is expired.

  • loaded_func – Function to be called when an entry is loaded from a map loader.

Returns:

A registration id which is used as a key to remove the listener.

async add_index(attributes: Sequence[str] = None, index_type: int | str = 0, name: str = None, bitmap_index_options: Dict[str, Any] = None) None[source]

Adds an index to this map for the specified entries so that queries can run faster.

Example

Let’s say your map values are Employee objects.

>>> class Employee(IdentifiedDataSerializable):
>>>     active = false
>>>     age = None
>>>     name = None
>>>     #other fields
>>>
>>>     #methods

If you query your values mostly based on age and active fields, you should consider indexing these.

>>> employees = await client.get_map("employees")
>>> await employees.add_index(attributes=["age"]) # Sorted index for range queries
>>> await employees.add_index(attributes=["active"], index_type=IndexType.HASH)) # Hash index for equality predicates

Index attribute should either have a getter method or be public. You should also make sure to add the indexes before adding entries to this map.

Indexing time is executed in parallel on each partition by operation threads. The Map is not blocked during this operation. The time taken in proportional to the size of the Map and the number Members.

Until the index finishes being created, any searches for the attribute will use a full Map scan, thus avoiding using a partially built index and returning incorrect results.

Parameters:
  • attributes – List of indexed attributes.

  • index_type – Type of the index. By default, set to SORTED.

  • name – Name of the index.

  • bitmap_index_options

    Bitmap index options.

    • unique_key: (str): The unique key attribute is used as a source of values which uniquely identify each entry being inserted into an index. Defaults to KEY_ATTRIBUTE_NAME. See the hazelcast.config.QueryConstants for possible values.

    • unique_key_transformation (int|str): The transformation is applied to every value extracted from the unique key attribue. Defaults to OBJECT. See the hazelcast.config.UniqueKeyTransformation for possible values.

async add_interceptor(interceptor: Any) str[source]

Adds an interceptor for this map.

Added interceptor will intercept operations and execute user defined methods.

Parameters:

interceptor – Interceptor for the map which includes user defined methods.

Returns:

Id of registered interceptor.

async aggregate(aggregator: Aggregator[AggregatorResultType], predicate: Predicate = None) AggregatorResultType[source]

Applies the aggregation logic on map entries and filter the result with the predicate, if given.

Parameters:
  • aggregator – Aggregator to aggregate the entries with.

  • predicate – Predicate to filter the entries with.

Returns:

The result of the aggregation.

async clear() None[source]

Clears the map.

The MAP_CLEARED event is fired for any registered listeners.

async contains_key(key: KeyType) bool[source]

Determines whether this map contains an entry with the key.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:

key – The specified key.

Returns:

True if this map contains an entry for the specified key, False otherwise.

async contains_value(value: ValueType) bool[source]

Determines whether this map contains one or more keys for the specified value.

Parameters:

value – The specified value.

Returns:

True if this map contains an entry for the specified value, False otherwise.

async delete(key: KeyType) None[source]

Removes the mapping for a key from this map if it is present (optional operation).

Unlike remove(object), this operation does not return the removed value, which avoids the serialization cost of the returned value. If the removed value will not be used, a delete operation is preferred over a remove operation for better performance.

The map will not contain a mapping for the specified key once the call returns.

Warning

This method breaks the contract of EntryListener. When an entry is removed by delete(), it fires an EntryEvent with a None old_value. Also, a listener with predicates will have None values, so only the keys can be queried via predicates.

Parameters:

key – Key of the mapping to be deleted.

async entry_set(predicate: Predicate = None) List[Tuple[KeyType, ValueType]][source]

Returns a list clone of the mappings contained in this map.

Warning

The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.

Parameters:

predicate – Predicate for the map to filter entries.

Returns:

The list of key-value tuples in the map.

async evict(key: KeyType) bool[source]

Evicts the specified key from this map.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:

key – Key to evict.

Returns:

True if the key is evicted, False otherwise.

async evict_all() None[source]

Evicts all keys from this map except the locked ones.

The EVICT_ALL event is fired for any registered listeners.

async execute_on_entries(entry_processor: Any, predicate: Predicate | None = None) List[Any][source]

Applies the user defined EntryProcessor to all the entries in the map or entries in the map which satisfies the predicate if provided. Returns the results mapped by each key in the map.

Parameters:
  • entry_processor – A stateful serializable object which represents the EntryProcessor defined on server side. This object must have a serializable EntryProcessor counter part registered on server side with the actual com.hazelcast.map.EntryProcessor implementation.

  • predicate – Predicate for filtering the entries.

Returns:

List of map entries which includes the keys and the results of the entry process.

async execute_on_key(key: KeyType, entry_processor: Any) Any[source]

Applies the user defined EntryProcessor to the entry mapped by the key. Returns the object which is the result of EntryProcessor’s process method.

Parameters:
  • key – Specified key for the entry to be processed.

  • entry_processor – A stateful serializable object which represents the EntryProcessor defined on server side. This object must have a serializable EntryProcessor counter part registered on server side with the actual com.hazelcast.map.EntryProcessor implementation.

Returns:

Result of entry process.

async execute_on_keys(keys: Sequence[KeyType], entry_processor: Any) List[Any][source]

Applies the user defined EntryProcessor to the entries mapped by the collection of keys. Returns the results mapped by each key in the collection.

Parameters:
  • keys – Collection of the keys for the entries to be processed.

  • entry_processor – A stateful serializable object which represents the EntryProcessor defined on server side. This object must have a serializable EntryProcessor counter part registered on server side with the actual com.hazelcast.map.EntryProcessor implementation.

Returns:

List of map entries which includes the keys and the results of the entry process.

async flush() None[source]

Flushes all the local dirty entries.

async get(key: KeyType) ValueType | None[source]

Returns the value for the specified key, or None if this map does not contain this key.

Warning

This method returns a clone of original value, modifying the returned value does not change the actual value in the map. One should put modified value back to make changes visible to all nodes.

>>> value = await my_map.get(key)
>>> value.update_some_property()
>>> await my_map.put(key,value)

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:

key – The specified key.

Returns:

The value for the specified key.

async get_all(keys: Sequence[KeyType]) Dict[KeyType, ValueType][source]

Returns the entries for the given keys.

Warning

The returned map is NOT backed by the original map, so changes to the original map are NOT reflected in the returned map, and vice-versa.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:

keys – Keys to get.

Returns:

Dictionary of map entries.

async get_entry_view(key: KeyType) SimpleEntryView[KeyType, ValueType][source]

Returns the EntryView for the specified key.

Warning

This method returns a clone of original mapping, modifying the returned value does not change the actual value in the map. One should put modified value back to make changes visible to all nodes.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:

key – The key of the entry.

Returns:

EntryView of the specified key.

async is_empty() bool[source]

Returns whether this map contains no key-value mappings or not.

Returns:

True if this map contains no key-value mappings, False otherwise.

async key_set(predicate: Predicate | None = None) List[ValueType][source]

Returns a List clone of the keys contained in this map or the keys of the entries filtered with the predicate if provided.

Warning

The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.

Parameters:

predicate – Predicate to filter the entries.

Returns:

A list of the clone of the keys.

async load_all(keys: Sequence[KeyType] = None, replace_existing_values: bool = True) None[source]

Loads all keys from the store at server side or loads the given keys if provided.

Parameters:
  • keys – Keys of the entry values to load.

  • replace_existing_values – Whether the existing values will be replaced or not with those loaded from the server side MapLoader.

async project(projection: Projection[ProjectionType], predicate: Predicate = None) ProjectionType[source]

Applies the projection logic on map entries and filter the result with the predicate, if given.

Parameters:
  • projection – Projection to project the entries with.

  • predicate – Predicate to filter the entries with.

Returns:

The result of the projection.

async put(key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None) ValueType | None[source]

Associates the specified value with the specified key in this map.

If the map previously contained a mapping for the key, the old value is replaced by the specified value. If ttl is provided, entry will expire and get evicted after the ttl.

Warning

This method returns a clone of the previous value, not the original (identically equal) value previously put into the map.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:
  • key – The specified key.

  • value – The value to associate with the key.

  • ttl – Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to 0 means infinite time-to-live.

  • max_idle – Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to 0 means infinite max idle time.

Returns:

Previous value associated with key or None if there was no mapping for key.

async put_all(map: Dict[KeyType, ValueType]) None[source]

Copies all the mappings from the specified map to this map.

No atomicity guarantees are given. In the case of a failure, some key-value tuples may get written, while others are not.

Parameters:

map – Dictionary which includes mappings to be stored in this map.

async put_if_absent(key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None) ValueType | None[source]

Associates the specified key with the given value if it is not already associated.

If ttl is provided, entry will expire and get evicted after the ttl.

This is equivalent to below, except that the action is performed atomically:

>>> if not (await my_map.contains_key(key)):
>>>     return await my_map.put(key,value)
>>> else:
>>>     return await my_map.get(key)

Warning

This method returns a clone of the previous value, not the original (identically equal) value previously put into the map.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:
  • key – Key of the entry.

  • value – Value of the entry.

  • ttl – Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to 0 means infinite time-to-live.

  • max_idle – Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to 0 means infinite max idle time.

Returns:

Old value of the entry.

async put_transient(key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None) None[source]

Same as put, but MapStore defined at the server side will not be called.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:
  • key – Key of the entry.

  • value – Value of the entry.

  • ttl – Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to 0 means infinite time-to-live.

  • max_idle – Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to 0 means infinite max idle time.

async remove(key: KeyType) ValueType | None[source]

Removes the mapping for a key from this map if it is present.

The map will not contain a mapping for the specified key once the call returns.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:

key – Key of the mapping to be deleted.

Returns:

The previous value associated with key, or None if there was no mapping for key.

async remove_all(predicate: Predicate) None[source]

Removes all entries which match with the supplied predicate.

Parameters:

predicate – Used to select entries to be removed from map.

async remove_if_same(key: KeyType, value: ValueType) bool[source]

Removes the entry for a key only if it is currently mapped to a given value.

This is equivalent to below, except that the action is performed atomically:

>>> if (await my_map.contains_key(key)) and (await my_map.get(key) == value):
>>>     await my_map.remove(key)
>>>     return True
>>> else:
>>>     return False

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:
  • key – The specified key.

  • value – Remove the key if it has this value.

Returns:

True if the value was removed, False otherwise.

async remove_entry_listener(registration_id: str) bool[source]

Removes the specified entry listener.

Returns silently if there is no such listener added before.

Parameters:

registration_id – Id of registered listener.

Returns:

True if registration is removed, False otherwise.

async remove_interceptor(registration_id: str) bool[source]

Removes the given interceptor for this map, so it will not intercept operations anymore.

Parameters:

registration_id – Registration ID of the map interceptor.

Returns:

True if the interceptor is removed, False otherwise.

async replace(key: KeyType, value: ValueType) ValueType | None[source]

Replaces the entry for a key only if it is currently mapped to some value.

This is equivalent to below, except that the action is performed atomically:

>>> if await my_map.contains_key(key):
>>>     return await my_map.put(key,value)
>>> else:
>>>     return None

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Warning

This method returns a clone of the previous value, not the original (identically equal) value previously put into the map.

Parameters:
  • key – The specified key.

  • value – The value to replace the previous value.

Returns:

Previous value associated with key, or None if there was no mapping for key.

async replace_if_same(key: ValueType, old_value: ValueType, new_value: ValueType) bool[source]

Replaces the entry for a key only if it is currently mapped to a given value.

This is equivalent to below, except that the action is performed atomically:

>>> if (await my_map.contains_key(key)) and (await my_map.get(key) == old_value):
>>>     await my_map.put(key, new_value)
>>>     return True
>>> else:
>>>     return False

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:
  • key – The specified key.

  • old_value – Replace the key value if it is the old value.

  • new_value – The new value to replace the old value.

Returns:

True if the value was replaced, False otherwise.

async set(key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None) None[source]

Puts an entry into this map.

Similar to the put operation except that set doesn’t return the old value, which is more efficient. If ttl is provided, entry will expire and get evicted after the ttl.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:
  • key – Key of the entry.

  • value – Value of the entry.

  • ttl – Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to 0 means infinite time-to-live.

  • max_idle – Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to 0 means infinite max idle time.

async set_ttl(key: KeyType, ttl: float) None[source]

Updates the TTL (time to live) value of the entry specified by the given key with a new TTL value.

New TTL value is valid starting from the time this operation is invoked, not since the time the entry was created. If the entry does not exist or is already expired, this call has no effect.

Parameters:
  • key – The key of the map entry.

  • ttl – Maximum time in seconds for this entry to stay in the map. Setting this to 0 means infinite time-to-live.

async size() int[source]

Returns the number of entries in this map.

Returns:

Number of entries in this map.

async try_put(key: KeyType, value: ValueType, timeout: float = 0) bool[source]

Tries to put the given key and value into this map and returns immediately if timeout is not provided.

If timeout is provided, operation waits until it is completed or timeout is reached.

Parameters:
  • key – Key of the entry.

  • value – Value of the entry.

  • timeout – Maximum time in seconds to wait.

Returns:

True if the put is successful, False otherwise.

async try_remove(key: KeyType, timeout: float = 0) bool[source]

Tries to remove the given key from this map and returns immediately if timeout is not provided.

If timeout is provided, operation waits until it is completed or timeout is reached.

Parameters:
  • key – Key of the entry to be deleted.

  • timeout – Maximum time in seconds to wait.

Returns:

True if the remove is successful, False otherwise.

async values(predicate: Predicate = None) List[ValueType][source]

Returns a list clone of the values contained in this map or values of the entries which are filtered with the predicate if provided.

Warning

The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.

Parameters:

predicate – Predicate to filter the entries.

Returns:

A list of clone of the values contained in this map.

class ReliableMessageListener[source]

Bases: Generic[MessageType]

A message listener for ReliableTopic.

A message listener will not be called concurrently (provided that it’s not registered twice). So there is no need to synchronize access to the state it reads or writes.

If a regular function is registered on a reliable topic, the message listener works fine, but it can’t do much more than listen to messages.

This is an enhanced version of that to better integrate with the reliable topic.

Durable Subscription

The ReliableMessageListener allows you to control where you want to start processing a message when the listener is registered. This makes it possible to create a durable subscription by storing the sequence of the last message and using this as the sequence id to start from.

Error handling

The ReliableMessageListener also gives the ability to deal with errors using the is_terminal() method. If a plain function is used, then it won’t terminate on errors and it will keep on running. But in some cases it is better to stop running.

Global order

The ReliableMessageListener will always get all events in order (global order). It will not get duplicates and there will only be gaps if it is too slow. For more information see is_loss_tolerant().

Delivery guarantees

Because the ReliableMessageListener controls which item it wants to continue from upon restart, it is very easy to provide an at-least-once or at-most-once delivery guarantee. The store_sequence() is always called before a message is processed; so it can be persisted on some non-volatile storage. When the retrieve_initial_sequence() returns the stored sequence, then an at-least-once delivery is implemented since the same item is now being processed twice. To implement an at-most-once delivery guarantee, add 1 to the stored sequence when the retrieve_initial_sequence() is called.

on_message(message: TopicMessage[MessageType]) None[source]

Invoked when a message is received for the added reliable topic.

One should not block in this callback. If blocking is necessary, consider delegating that task to an executor or a thread pool.

Parameters:

message – The message that is received for the topic

retrieve_initial_sequence() int[source]

Retrieves the initial sequence from which this ReliableMessageListener should start.

Return -1 if there is no initial sequence and you want to start from the next published message.

If you intend to create a durable subscriber so you continue from where you stopped the previous time, load the previous sequence and add 1. If you don’t add one, then you will be receiving the same message twice.

Returns:

The initial sequence.

store_sequence(sequence: int) None[source]

Informs the ReliableMessageListener that it should store the sequence. This method is called before the message is processed. Can be used to make a durable subscription.

Parameters:

sequence – The sequence.

is_loss_tolerant() bool[source]

Checks if this ReliableMessageListener is able to deal with message loss. Even though the reliable topic promises to be reliable, it can be that a ReliableMessageListener is too slow. Eventually the message won’t be available anymore.

If the ReliableMessageListener is not loss tolerant and the topic detects that there are missing messages, it will terminate the ReliableMessageListener.

Returns:

True if the ReliableMessageListener is tolerant towards losing messages.

is_terminal(error: Exception) bool[source]

Checks if the ReliableMessageListener should be terminated based on an error raised while calling on_message().

Parameters:

error – The error raised while calling on_message()

Returns:

True if the ReliableMessageListener should terminate itself, False if it should keep on running.

on_cancel() None[source]

Called when the ReliableMessageListener is cancelled. This can happen when the listener is unregistered or cancelled due to an exception or during shutdown.

class ReliableTopic(service_name, name, context, ringbuffer)[source]

Bases: Proxy, Generic[MessageType]

Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subscribers, which is also known as a publish/subscribe (pub/sub) messaging model. Publish and subscriptions are cluster-wide. When a member subscribes for a topic, it is actually registering for messages published by any member in the cluster, including the new members joined after you added the listener.

Messages are ordered, meaning that listeners(subscribers) will process the messages in the order they are actually published.

Hazelcast’s Reliable Topic uses the same Topic interface as a regular topic. The main difference is that Reliable Topic is backed up by the Ringbuffer data structure, a replicated but not partitioned data structure that stores its data in a ring-like structure.

async publish(message: MessageType) None[source]

Publishes the message to all subscribers of this topic.

Parameters:

message – The message.

async publish_all(messages: Sequence[MessageType]) None[source]

Publishes all messages to all subscribers of this topic.

Parameters:

messages – Messages to publish.

async add_listener(listener: ReliableMessageListener | Callable[[TopicMessage[MessageType]], None]) str[source]

Subscribes to this reliable topic.

It can be either a simple function or an instance of an ReliableMessageListener. When a function is passed, a ReliableMessageListener is created out of that with sensible default values.

When a message is published, the ReliableMessageListener.on_message() method of the given listener (or the function passed) is called.

More than one message listener can be added on one instance.

Parameters:

listener – Listener to add.

Returns:

The registration id.

async remove_listener(registration_id: str) bool[source]

Stops receiving messages for the given message listener.

If the given listener already removed, this method does nothing.

Parameters:

registration_id – ID of listener registration.

Returns:

True if registration is removed, False otherwise.

async destroy() bool[source]

Destroys underlying Proxy and RingBuffer instances.

class MultiMap(service_name, name, context)[source]

Bases: Proxy, Generic[KeyType, ValueType]

A specialized map whose keys can be associated with multiple values.

Example

>>> my_map = await client.get_multi_map("my_map")
>>> print("put", await my_map.put("key", "value1"))
>>> print("get", await my_map.get("key"))

Warning

Asyncio client multi map proxy is not thread-safe, do not access it from other threads.

async add_entry_listener(include_value: bool = False, key: KeyType = None, added_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, removed_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, clear_all_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None) str[source]

Adds an entry listener for this multimap.

The listener will be notified for all multimap add/remove/clear-all events.

Parameters:
  • include_value – Whether received event should include the value or not.

  • key – Key for filtering the events.

  • added_func – Function to be called when an entry is added to map.

  • removed_func – Function to be called when an entry is removed from map.

  • clear_all_func – Function to be called when entries are cleared from map.

Returns:

A registration id which is used as a key to remove the listener.

async contains_key(key: KeyType) bool[source]

Determines whether this multimap contains an entry with the key.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:

key – The specified key.

Returns:

True if this multimap contains an entry for the specified key, False otherwise.

async contains_value(value: ValueType) bool[source]

Determines whether this map contains one or more keys for the specified value.

Parameters:

value – The specified value.

Returns:

True if this multimap contains an entry for the specified value, False otherwise.

async contains_entry(key: KeyType, value: ValueType) bool[source]

Returns whether the multimap contains an entry with the value.

Parameters:
  • key – The specified key.

  • value – The specified value.

Returns:

True if this multimap contains the key-value tuple, False otherwise.

async clear() None[source]

Clears the multimap. Removes all key-value tuples.

async entry_set() List[Tuple[KeyType, ValueType]][source]

Returns the list of key-value tuples in the multimap.

Warning

The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.

Returns:

The list of key-value tuples in the multimap.

async get(key: KeyType) List[ValueType] | None[source]

Returns the list of values associated with the key. None if this map does not contain this key.

Warning

This method uses __hash__ and __eq__ of the binary form of the key, not the actual implementations of __hash__ and __eq__ defined in the key’s class.

Warning

The list is NOT backed by the multimap, so changes to the map are not reflected in the collection, and vice-versa.

Parameters:

key – The specified key.

Returns:

The list of the values associated with the specified key.

async key_set() List[KeyType][source]

Returns the list of keys in the multimap.

Warning

The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.

Returns:

A list of the clone of the keys.

async remove(key: KeyType, value: ValueType) bool[source]

Removes the given key-value tuple from the multimap.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:
  • key – The key of the entry to remove.

  • value – The value of the entry to remove.

Returns:

True if the size of the multimap changed after the remove operation, False otherwise.

async remove_all(key: KeyType) List[ValueType][source]

Removes all the entries with the given key and returns the value list associated with this key.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Warning

The returned list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.

Parameters:

key – The key of the entries to remove.

Returns:

The collection of removed values associated with the given key.

async put(key: KeyType, value: ValueType) bool[source]

Stores a key-value tuple in the multimap.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:
  • key – The key to be stored.

  • value – The value to be stored.

Returns:

True if size of the multimap is increased, False if the multimap already contains the key-value tuple.

async put_all(multimap: Dict[KeyType, Sequence[ValueType]]) None[source]

Stores the given Map in the MultiMap.

The results of concurrently mutating the given map are undefined. No atomicity guarantees are given. It could be that in case of failure some of the key/value-pairs get written, while others are not.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:

multimap – the map corresponds to multimap entries.

async remove_entry_listener(registration_id: str) bool[source]

Removes the specified entry listener.

Returns silently if there is no such listener added before.

Parameters:

registration_id – Id of registered listener.

Returns:

True if registration is removed, False otherwise.

async size() int[source]

Returns the number of entries in this multimap.

Returns:

Number of entries in this multimap.

async value_count(key: KeyType) int[source]

Returns the number of values that match the given key in the multimap.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:

key – The key whose values count is to be returned.

Returns:

The number of values that match the given key in the multimap.

async values() List[ValueType][source]

Returns the list of values in the multimap.

Warning

The returned list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.

Returns:

The list of values in the multimap.

class PNCounter(service_name, name, context)[source]

Bases: Proxy

PN (Positive-Negative) CRDT counter.

The counter supports adding and subtracting values as well as retrieving the current counter value. Each replica of this counter can perform operations locally without coordination with the other replicas, thus increasing availability. The counter guarantees that whenever two nodes have received the same set of updates, possibly in a different order, their state is identical, and any conflicting updates are merged automatically. If no new updates are made to the shared state, all nodes that can communicate will eventually have the same data.

When invoking updates from the client, the invocation is remote. This may lead to indeterminate state - the update may be applied but the response has not been received. In this case, the caller will be notified with a TargetDisconnectedError.

The read and write methods provide monotonic read and RYW (read-your-write) guarantees. These guarantees are session guarantees which means that if no replica with the previously observed state is reachable, the session guarantees are lost and the method invocation will throw a ConsistencyLostError. This does not mean that an update is lost. All of the updates are part of some replica and will be eventually reflected in the state of all other replicas. This exception just means that you cannot observe your own writes because all replicas that contain your updates are currently unreachable. After you have received a ConsistencyLostError, you can either wait for a sufficiently up-to-date replica to become reachable in which case the session can be continued or you can reset the session by calling the reset() method. If you have called the reset() method, a new session is started with the next invocation to a CRDT replica.

Notes

The CRDT state is kept entirely on non-lite (data) members. If there aren’t any and the methods here are invoked on a lite member, they will fail with an NoDataMemberInClusterError.

async get() int[source]

Returns the current value of the counter.

Returns:

The current value of the counter.

Raises:
async get_and_add(delta: int) int[source]

Adds the given value to the current value and returns the previous value.

Parameters:

delta – The value to add.

Returns:

The previous value.

Raises:
async add_and_get(delta: int) int[source]

Adds the given value to the current value and returns the updated value.

Parameters:

delta – The value to add.

Returns:

The updated value.

Raises:
async get_and_subtract(delta: int) int[source]

Subtracts the given value from the current value and returns the previous value.

Parameters:

delta – The value to subtract.

Returns:

The previous value.

Raises:
async subtract_and_get(delta: int) int[source]

Subtracts the given value from the current value and returns the updated value.

Parameters:

delta – The value to subtract.

Returns:

The updated value.

Raises:
async get_and_decrement() int[source]

Decrements the counter value by one and returns the previous value.

Returns:

The previous value.

Raises:
async decrement_and_get() int[source]

Decrements the counter value by one and returns the updated value.

Returns:

The updated value.

Raises:
async get_and_increment() int[source]

Increments the counter value by one and returns the previous value.

Returns:

The previous value.

Raises:
async increment_and_get() int[source]

Increments the counter value by one and returns the updated value.

Returns:

The updated value.

Raises:
reset() None[source]

Resets the observed state by this PN counter.

This method may be used after a method invocation has thrown a ConsistencyLostError to reset the proxy and to be able to start a new session.

class Queue(service_name, name, context)[source]

Bases: PartitionSpecificProxy, Generic[ItemType]

Concurrent, blocking, distributed, observable queue.

Queue is not a partitioned data-structure. All of the Queue content is stored in a single machine (and in the backup). Queue will not scale by adding more members in the cluster.

Example

>>> my_queue = await client.get_queue("my_queue")
>>> print("queue.offer", await my_queue.offer("item"))
>>> print("queue.size", await my_queue.size())

Warning

Asyncio client queue proxy is not thread-safe, do not access it from other threads.

async add(item: ItemType) bool[source]

Adds the specified item to this queue if there is available space.

Parameters:

item – The specified item.

Returns:

True if element is successfully added, False otherwise.

Raises:

IllegalStateError – If queue is full.

async add_all(items: Sequence[ItemType]) bool[source]

Adds the elements in the specified collection to this queue.

Parameters:

items – Collection which includes the items to be added.

Returns:

True if this queue is changed after call, False otherwise.

async add_listener(include_value: bool = False, item_added_func: Callable[[ItemEvent[ItemType]], None] = None, item_removed_func: Callable[[ItemEvent[ItemType]], None] = None) str[source]

Adds an item listener for this queue. Listener will be notified for all queue add/remove events.

Parameters:
  • include_value – Whether received events include the updated item or not.

  • item_added_func – Function to be called when an item is added to this queue.

  • item_removed_func – Function to be called when an item is deleted from this queue.

Returns:

A registration id which is used as a key to remove the listener.

async clear() None[source]

Clears this queue. Queue will be empty after this call.

async contains(item: ItemType) bool[source]

Determines whether this queue contains the specified item or not.

Parameters:

item – The specified item to be searched.

Returns:

True if the specified item exists in this queue, False otherwise.

async contains_all(items: Sequence[ItemType]) bool[source]

Determines whether this queue contains all of the items in the specified collection or not.

Parameters:

items – The specified collection which includes the items to be searched.

Returns:

True if all of the items in the specified collection exist in this queue, False otherwise.

async drain_to(target_list: List[ItemType], max_size: int = -1) int[source]

Transfers all available items to the given target_list and removes these items from this queue.

If a max_size is specified, it transfers at most the given number of items. In case of a failure, an item can exist in both collections or none of them.

This operation may be more efficient than polling elements repeatedly and putting into collection.

Parameters:
  • target_list – the list where the items in this queue will be transferred.

  • max_size – The maximum number items to transfer.

Returns:

Number of transferred items.

async iterator() List[ItemType][source]

Returns all the items in this queue.

Returns:

Collection of items in this queue.

async is_empty() bool[source]

Determines whether this queue is empty or not.

Returns:

True if this queue is empty, False otherwise.

async offer(item: ItemType, timeout: float = 0) bool[source]

Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions.

If there is no space currently available:

  • If the timeout is provided, it waits until this timeout elapses and returns the result.

  • If the timeout is not provided, returns False immediately.

Parameters:
  • item – The item to be added.

  • timeout – Maximum time in seconds to wait for addition.

Returns:

True if the element was added to this queue, False otherwise.

async peek() ItemType | None[source]

Retrieves the head of queue without removing it from the queue.

Returns:

The head of this queue, or None if this queue is empty.

async poll(timeout: float = 0) ItemType | None[source]

Retrieves and removes the head of this queue.

If this queue is empty:

  • If the timeout is provided, it waits until this timeout elapses and returns the result.

  • If the timeout is not provided, returns None.

Parameters:

timeout – Maximum time in seconds to wait for addition.

Returns:

The head of this queue, or None if this queue is empty or specified timeout elapses before an item is added to the queue.

async put(item: ItemType) None[source]

Adds the specified element into this queue.

If there is no space, it waits until necessary space becomes available.

Parameters:

item – The specified item.

async remaining_capacity() int[source]

Returns the remaining capacity of this queue.

Returns:

Remaining capacity of this queue.

async remove(item: ItemType) bool[source]

Removes the specified element from the queue if it exists.

Parameters:

item – The specified element to be removed.

Returns:

True if the specified element exists in this queue, False otherwise.

async remove_all(items: Sequence[ItemType]) bool[source]

Removes all of the elements of the specified collection from this queue.

Parameters:

items – The specified collection.

Returns:

True if the call changed this queue, False otherwise.

async remove_listener(registration_id: str) bool[source]

Removes the specified item listener.

Returns silently if the specified listener was not added before.

Parameters:

registration_id – Id of the listener to be deleted.

Returns:

True if the item listener is removed, False otherwise.

async retain_all(items: Sequence[ItemType]) bool[source]

Removes the items which are not contained in the specified collection.

In other words, only the items that are contained in the specified collection will be retained.

Parameters:

items – Collection which includes the elements to be retained in this queue.

Returns:

True if this queue changed as a result of the call, False otherwise.

async size() int[source]

Returns the number of elements in this collection.

If the size is greater than 2**31 - 1, it returns 2**31 - 1.

Returns:

Size of the queue.

async take() ItemType[source]

Retrieves and removes the head of this queue, if necessary, waits until an item becomes available.

Returns:

The head of this queue.

class ReplicatedMap(service_name, name, context)[source]

Bases: Proxy, Generic[KeyType, ValueType]

A ReplicatedMap is a map-like data structure with weak consistency and values locally stored on every node of the cluster.

Whenever a value is written asynchronously, the new value will be internally distributed to all existing cluster members, and eventually every node will have the new value.

When a new node joins the cluster, the new node initially will request existing values from older nodes and replicate them locally.

Example

>>> my_map = await client.get_replicated_map("my_map")
>>> print("put", await my_map.put("key", "value"))
>>> print("get", await my_map.get("key"))
async add_entry_listener(key: KeyType = None, predicate: Predicate = None, added_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, removed_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, updated_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, evicted_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, clear_all_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None) str[source]

Adds a continuous entry listener for this map.

Listener will get notified for map events filtered with given parameters.

Parameters:
  • key – Key for filtering the events.

  • predicate – Predicate for filtering the events.

  • added_func – Function to be called when an entry is added to map.

  • removed_func – Function to be called when an entry is removed from map.

  • updated_func – Function to be called when an entry is updated.

  • evicted_func – Function to be called when an entry is evicted from map.

  • clear_all_func – Function to be called when entries are cleared from map.

Returns:

A registration id which is used as a key to remove the listener.

async clear() None[source]

Wipes data out of the replicated map.

async contains_key(key: KeyType) bool[source]

Determines whether this map contains an entry with the key.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:

key – The specified key.

Returns:

True if this map contains an entry for the specified key, False otherwise.

async contains_value(value: ValueType) bool[source]

Determines whether this map contains one or more keys for the specified value.

Parameters:

value – The specified value.

Returns:

True if this map contains an entry for the specified value, False otherwise.

async entry_set() List[Tuple[KeyType, ValueType]][source]

Returns a List clone of the mappings contained in this map.

Warning

The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.

Returns:

The list of key-value tuples in the map.

async get(key: KeyType) ValueType | None[source]

Returns the value for the specified key, or None if this map does not contain this key.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:

key – The specified key.

Returns:

The value associated with the specified key.

async is_empty() bool[source]

Returns True if this map contains no key-value mappings.

Returns:

True if this map contains no key-value mappings.

async key_set() List[KeyType][source]

Returns the list of keys in the ReplicatedMap.

Warning

The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.

Returns:

A list of the clone of the keys.

async put(key: KeyType, value: ValueType, ttl: float = 0) ValueType | None[source]

Associates the specified value with the specified key in this map.

If the map previously contained a mapping for the key, the old value is replaced by the specified value. If ttl is provided, entry will expire and get evicted after the ttl.

Parameters:
  • key – The specified key.

  • value – The value to associate with the key.

  • ttl – Maximum time in seconds for this entry to stay, if not provided, the value configured on server side configuration will be used.

Returns:

Previous value associated with key or None if there was no mapping for key.

async put_all(source: Dict[KeyType, ValueType]) None[source]

Copies all the mappings from the specified map to this map.

No atomicity guarantees are given. In the case of a failure, some key-value tuples may get written, while others are not.

Parameters:

source – Map which includes mappings to be stored in this map.

async remove(key: KeyType) ValueType | None[source]

Removes the mapping for a key from this map if it is present.

The map will not contain a mapping for the specified key once the call returns.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:

key – Key of the mapping to be deleted.

Returns:

The previous value associated with key, or None if there was no mapping for key.

async remove_entry_listener(registration_id: str) bool[source]

Removes the specified entry listener.

Returns silently if there is no such listener added before.

Parameters:

registration_id – Id of registered listener.

Returns:

True if registration is removed, False otherwise.

async size() int[source]

Returns the number of entries in this replicated map.

Returns:

Number of entries in this replicated map.

async values() List[ValueType][source]

Returns the list of values in the map.

Warning

The returned list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.

Returns:

The list of values in the map.

class Ringbuffer(service_name, name, context)[source]

Bases: PartitionSpecificProxy, Generic[ItemType]

A Ringbuffer is an append-only data-structure where the content is stored in a ring like structure.

A ringbuffer has a capacity so it won’t grow beyond that capacity and endanger the stability of the system. If that capacity is exceeded, then the oldest item in the ringbuffer is overwritten. The ringbuffer has two always incrementing sequences:

  • tail_sequence(): This is the side where the youngest item is found. So the tail is the side of the ringbuffer where items are added to.

  • head_sequence(): This is the side where the oldest items are found. So the head is the side where items gets discarded.

The items in the ringbuffer can be found by a sequence that is in between (inclusive) the head and tail sequence.

If data is read from a ringbuffer with a sequence that is smaller than the head sequence, it means that the data is not available anymore and a hazelcast.errors.StaleSequenceError is thrown.

A Ringbuffer currently is a replicated, but not partitioned data structure. So all data is stored in a single partition, similarly to the hazelcast.internal.asyncio_proxy.queue.Queue implementation.

A Ringbuffer can be used in a way similar to the Queue, but one of the key differences is that a hazelcast.internal.asyncio_proxy.queue.Queue.take() is destructive, meaning that only 1 consumer is able to take an item. A read_one() is not destructive, so you can have multiple consumers reading the same item multiple times.

Example

>>> rb = await client.get_ringbuffer("my_ringbuffer")
>>> await rb.add("item")
>>> print("read_one", await rb.read_one(0))
async capacity() int[source]

Returns the capacity of this Ringbuffer.

Returns:

The capacity of Ringbuffer.

async size() int[source]

Returns number of items in the Ringbuffer.

Returns:

The size of Ringbuffer.

async tail_sequence() int[source]

Returns the sequence of the tail.

The tail is the side of the Ringbuffer where the items are added to. The initial value of the tail is -1.

Returns:

The sequence of the tail.

async head_sequence() int[source]

Returns the sequence of the head.

The head is the side of the Ringbuffer where the oldest items in the Ringbuffer are found. If the Ringbuffer is empty, the head will be one more than the tail. The initial value of the head is 0 (1 more than tail).

Returns:

The sequence of the head.

async remaining_capacity() int[source]

Returns the remaining capacity of the Ringbuffer.

Returns:

The remaining capacity of Ringbuffer.

async add(item, overflow_policy: int = 0) int[source]

Adds the specified item to the tail of the Ringbuffer.

If there is no space in the Ringbuffer, the action is determined by overflow_policy.

Parameters:
  • item – The specified item to be added.

  • overflow_policy – the OverflowPolicy to be used when there is no space.

Returns:

The sequenceId of the added item, or -1 if the add failed.

async add_all(items: Sequence[ItemType], overflow_policy: int = 0) int[source]

Adds all items in the specified collection to the tail of the Ringbuffer.

This is likely to outperform multiple calls to add() due to better io utilization and a reduced number of executed operations. The items are added in the order of the Iterator of the collection.

If there is no space in the Ringbuffer, the action is determined by overflow_policy.

Parameters:
  • items – The specified collection which contains the items to be added.

  • overflow_policy – The OverflowPolicy to be used when there is no space.

Returns:

The sequenceId of the last written item, or -1 of the last write is failed.

async read_one(sequence: int) ItemType[source]

Reads one item from the Ringbuffer.

If the sequence is one beyond the current tail, this call blocks until an item is added. Currently, it isn’t possible to control how long this call is going to block.

Parameters:

sequence – The sequence of the item to read.

Returns:

The read item.

async read_many(start_sequence: int, min_count: int, max_count: int, filter: Any = None) ReadResult[source]

Reads a batch of items from the Ringbuffer.

If the number of available items after the first read item is smaller than max_count, these items are returned. So, number of items read may be smaller than max_count. If there are fewer items available than min_count, then this call blocks.

Warning

These blocking calls consume server memory and if there are many calls, an OutOfMemoryError may be thrown on server-side.

Reading a batch of items is likely to perform better because less overhead is involved.

A filter can be provided to select items that need to be read. If the filter is None, all items are read. If the filter is not None, items where the filter function returns true are returned. Using filters is a good way to prevent getting items that are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant performance improvement. Note that, filtering logic must be defined on the server-side.

If the start_sequence is smaller than the smallest sequence still available in the Ringbuffer (head_sequence()), then the smallest available sequence will be used as the start sequence and the minimum/maximum number of items will be attempted to be read from there on.

If the start_sequence is bigger than the last available sequence in the Ringbuffer (tail_sequence()), then the last available sequence plus one will be used as the start sequence and the call will block until further items become available and it can read at least the minimum number of items.

Parameters:
  • start_sequence – The start sequence of the first item to read.

  • min_count – The minimum number of items to read.

  • max_count – The maximum number of items to read.

  • filter – Filter to select returned elements.

Returns:

The list of read items.

class Semaphore(context, group_id, service_name, proxy_name, object_name)[source]

Bases: BaseCPProxy

A linearizable, distributed semaphore.

Semaphores are often used to restrict the number of callers that can access some physical or logical resource.

Semaphore is a cluster-wide counting semaphore. Conceptually, it maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Dually, each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the semaphore just keeps a count of the number available and acts accordingly.

Hazelcast’s distributed semaphore implementation guarantees that callers invoking any of the acquire() methods are selected to obtain permits in the order of their invocations (first-in-first-out; FIFO). Note that FIFO ordering implies the order which the primary replica of an Semaphore receives these acquire requests. Therefore, it is possible for one member to invoke acquire() before another member, but its request hits the primary replica after the other member.

This class also provides convenient ways to work with multiple permits at once. Beware of the increased risk of indefinite postponement when using the multiple-permit acquire. If permits are released one by one, a caller waiting for one permit will acquire it before a caller waiting for multiple permits regardless of the call order.

Correct usage of a semaphore is established by programming convention in the application.

It works on top of the Raft consensus algorithm. It offers linearizability during crash failures and network partitions. It is CP with respect to the CAP principle. If a network partition occurs, it remains available on at most one side of the partition.

It has 2 variations:

  • The default implementation accessed via cp_subsystem is session-aware. In this one, when a caller makes its very first acquire() call, it starts a new CP session with the underlying CP group. Then, liveliness of the caller is tracked via this CP session. When the caller fails, permits acquired by this caller are automatically and safely released. However, the session-aware version comes with a limitation, that is, a client cannot release permits before acquiring them first. In other words, a client can release only the permits it has acquired earlier. It means, you can acquire a permit from one thread and release it from another thread using the same Hazelcast client, but not different instances of Hazelcast client. You can use the session-aware CP Semaphore implementation by disabling JDK compatibility via jdk-compatible server-side setting. Although the session-aware implementation has a minor difference to the JDK Semaphore, we think it is a better fit for distributed environments because of its safe auto-cleanup mechanism for acquired permits.

  • The second implementation offered by cp_subsystem is sessionless. This implementation does not perform auto-cleanup of acquired permits on failures. Acquired permits are not bound to threads and permits can be released without acquiring first. However, you need to handle failed permit owners on your own. If a Hazelcast server or a client fails while holding some permits, they will not be automatically released. You can use the sessionless CP Semaphore implementation by enabling JDK compatibility via jdk-compatible server-side setting.

There is a subtle difference between the lock and semaphore abstractions. A lock can be assigned to at most one endpoint at a time, so we have a total order among its holders. However, permits of a semaphore can be assigned to multiple endpoints at a time, which implies that we may not have a total order among permit holders. In fact, permit holders are partially ordered. For this reason, the fencing token approach, which is explained in FencedLock, does not work for the semaphore abstraction. Moreover, each permit is an independent entity. Multiple permit acquires and reentrant lock acquires of a single endpoint are not equivalent. The only case where a semaphore behaves like a lock is the binary case, where the semaphore has only 1 permit. In this case, the semaphore works like a non-reentrant lock.

All of the API methods in the new CP Semaphore implementation offer the exactly-once execution semantics for the session-aware version. For instance, even if a release() call is internally retried because of a crashed Hazelcast member, the permit is released only once. However, this guarantee is not given for the sessionless, a.k.a, JDK-compatible CP Semaphore.

async init(permits: int) bool[source]

Tries to initialize this Semaphore instance with the given permit count.

Parameters:

permits – The given permit count.

Returns:

True if the initialization succeeds, False if already initialized.

Raises:

AssertionError – If the permits is negative.

async acquire(permits: int = 1) None[source]

Acquires the given number of permits if they are available, and returns immediately, reducing the number of available permits by the given amount.

If insufficient permits are available then the result of the returned future is not set until one of the following things happens:

  • Some other caller invokes one of the release methods for this semaphore, the current caller is next to be assigned permits and the number of available permits satisfies this request,

  • This Semaphore instance is destroyed

Parameters:

permits – Optional number of permits to acquire; defaults to 1 when not specified

Raises:

AssertionError – If the permits is not positive.

async available_permits() int[source]

Returns the current number of permits currently available in this semaphore.

This method is typically used for debugging and testing purposes.

Returns:

The number of permits available in this semaphore.

async drain_permits() int[source]

Acquires and returns all permits that are available at invocation time.

Returns:

The number of permits drained.

async reduce_permits(reduction: int) None[source]

Reduces the number of available permits by the indicated amount.

This method differs from acquire as it does not block until permits become available. Similarly, if the caller has acquired some permits, they are not released with this call.

Parameters:

reduction – The number of permits to reduce.

Raises:

AssertionError – If the reduction is negative.

async increase_permits(increase: int) None[source]

Increases the number of available permits by the indicated amount.

If there are some callers waiting for permits to become available, they will be notified. Moreover, if the caller has acquired some permits, they are not released with this call.

Parameters:

increase – The number of permits to increase.

Raises:

AssertionError – If increase is negative.

async release(permits: int = 1) None[source]

Releases the given number of permits and increases the number of available permits by that amount.

If some callers in the cluster are blocked for acquiring permits, they will be notified.

If the underlying Semaphore implementation is non-JDK-compatible (configured via jdk-compatible server-side setting), then a client can only release a permit which it has acquired before. In other words, a client cannot release a permit without acquiring it first.

Otherwise, which means the underlying implementation is JDK compatible (configured via jdk-compatible server-side setting), there is no requirement that a client that releases a permit must have acquired that permit by calling one of the acquire() methods. A client can freely release a permit without acquiring it first. In this case, correct usage of a semaphore is established by programming convention in the application.

Parameters:

permits – Optional number of permits to release; defaults to 1 when not specified.

Raises:
  • AssertionError – If the permits is not positive.

  • IllegalStateError – if the Semaphore is non-JDK-compatible and the caller does not have a permit

async try_acquire(permits: int = 1, timeout: float = 0) bool[source]

Acquires the given number of permits and returns True, if they become available during the given waiting time.

If permits are acquired, the number of available permits in the Semaphore instance is also reduced by the given amount.

If no sufficient permits are available, then the result of the returned future is not set until one of the following things happens:

  • Permits are released by other callers, the current caller is next to be assigned permits and the number of available permits satisfies this request

  • The specified waiting time elapses

Parameters:
  • permits – The number of permits to acquire; defaults to 1 when not specified.

  • timeout – Optional timeout in seconds to wait for the permits; when it’s not specified the operation will return immediately after the acquire attempt.

Returns:

True if all permits were acquired, False if the waiting time elapsed before all permits could be acquired

Raises:

AssertionError – If the permits is not positive.

class Set(service_name, name, context)[source]

Bases: PartitionSpecificProxy, Generic[ItemType]

Concurrent, distributed implementation of Set.

Example

>>> my_set = await client.get_set("my_set")
>>> print("set.add", await my_set.add("item"))
>>> print("set.size", await my_set.size())

Warning

Asyncio client set proxy is not thread-safe, do not access it from other threads.

async add(item: ItemType) bool[source]

Adds the specified item if it is not exists in this set.

Parameters:

item – The specified item to be added.

Returns:

True if this set is changed after call, False otherwise.

async add_all(items: Sequence[ItemType]) bool[source]

Adds the elements in the specified collection if they’re not exist in this set.

Parameters:

items – Collection which includes the items to be added.

Returns:

True if this set is changed after call, False otherwise.

async add_listener(include_value: bool = False, item_added_func: Callable[[ItemEvent[ItemType]], None] = None, item_removed_func: Callable[[ItemEvent[ItemType]], None] = None) str[source]

Adds an item listener for this container.

Listener will be notified for all container add/remove events.

Parameters:
  • include_value – Whether received events include the updated item or not.

  • item_added_func – Function to be called when an item is added to this set.

  • item_removed_func – Function to be called when an item is deleted from this set.

Returns:

A registration id which is used as a key to remove the listener.

async clear() None[source]

Clears the set. Set will be empty with this call.

async contains(item: ItemType) bool[source]

Determines whether this set contains the specified item or not.

Parameters:

item – The specified item to be searched.

Returns:

True if the specified item exists in this set, False otherwise.

async contains_all(items: Sequence[ItemType]) bool[source]

Determines whether this set contains all items in the specified collection or not.

Parameters:

items – The specified collection which includes the items to be searched.

Returns:

True if all the items in the specified collection exist in this set, False otherwise.

async get_all() List[ItemType][source]

Returns all the items in the set.

Returns:

List of the items in this set.

async is_empty() bool[source]

Determines whether this set is empty or not.

Returns:

True if this set is empty, False otherwise.

async remove(item: ItemType) bool[source]

Removes the specified element from the set if it exists.

Parameters:

item – The specified element to be removed.

Returns:

True if the specified element exists in this set, False otherwise.

async remove_all(items: Sequence[ItemType]) bool[source]

Removes all of the elements of the specified collection from this set.

Parameters:

items – The specified collection.

Returns:

True if the call changed this set, False otherwise.

async remove_listener(registration_id: str) bool[source]

Removes the specified item listener.

Returns silently if the specified listener was not added before.

Parameters:

registration_id – Id of the listener to be deleted.

Returns:

True if the item listener is removed, False otherwise.

async retain_all(items: Sequence[ItemType]) bool[source]

Removes the items which are not contained in the specified collection.

In other words, only the items that are contained in the specified collection will be retained.

Parameters:

items – Collection which includes the elements to be retained in this set.

Returns:

True if this set changed as a result of the call, False otherwise.

async size() int[source]

Returns the number of items in this set.

Returns:

Number of items in this set.

class VectorCollection(service_name, name, context)[source]

Bases: Proxy, Generic[KeyType, ValueType]

VectorCollection contains documents with vectors.

Concurrent, distributed, observable and searchable vector collection.

The configuration of the vector collection must exist before it can be used.

Example

>>> await client.create_vector_collection_config("my_vc", [
>>>    IndexConfig(name="default-vector", metric=Metric.COSINE, dimension=2)
>>> ]
>>> my_vc = await client.get_vector_collection("my_vc")
>>> await my_vc.set("key1", Vector("default-vector", Type.DENSE, [0.1, 0.2])

Warning

Asyncio client vector collection proxy is not thread-safe, do not access it from other threads.

Warning

Asyncio client is BETA. Its public API may change until General Availability release.

async get(key: Any) Document | None[source]

Returns the Document for the specified key, or None if this VectorCollection does not contain this key.

Warning

This method returns a clone of the original Document. Modifying the returned Document does not change the actual Document in the VectorCollection. Put the modified Document back to make changes visible to all nodes.

>>> doc = await my_vc.get(key)
>>> doc.value.update_some_property()
>>> await my_vc.set(key, doc)

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in the key’s class.

Parameters:

key – The specified key.

Returns:

The Document for the specified key or None if there was no mapping for key.

async set(key: Any, document: Document) None[source]

Sets a document for the given key in the VectorCollection.

Similar to the put operation except that set doesn’t return the old document, which is more efficient.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:
  • key – Key of the entry.

  • document – Document of the entry.

async put(key: Any, document: Document) Document | None[source]

Associates the specified Document with the specified key in this VectorCollection.

If the VectorCollection previously contained a mapping for the key, the old Document is replaced by the specified Document. If the previous value is not needed, using the set method is more efficient.

Warning

This method returns a clone of the previous Document, not the original (identically equal) Document previously put into the VectorCollection.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in the key’s class.

Parameters:
  • key – Key of the entry.

  • document – Document of the entry.

Returns:

Previous Document associated with the key or None if there was no mapping for the key.

async put_all(map: Dict[Any, Document]) None[source]

Copies all the mappings from the specified dictionary to this VectorCollection.

No atomicity guarantees are given. In the case of a failure, some key-document tuples may get written, while others are not.

Parameters:

map – Dictionary which includes mappings to be stored in this VectorCollection.

async put_if_absent(key: Any, document: Document) Document | None[source]

Associates the specified key with the given Document if it is not already associated.

Warning

This method returns a clone of the previous Document, not the original (identically equal) Document previously put into the VectorCollection.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in key’s class.

Parameters:
  • key – Key of the entry.

  • document – Document of the entry.

Returns:

Old Document for the given key or None if there is not one.

async search_near_vector(vector: Vector, *, include_value: bool = False, include_vectors: bool = False, limit: int = 10, hints: Dict[str, str] = None) List[SearchResult][source]

Returns the Documents closest to the given vector.

The search is performed using the distance metric set when creating the vector index.

Parameters:
  • vector – The vector to be used as the reference. It must have the same dimension as specified when creating the vector index.

  • include_value – Return value attached to the Document.

  • include_vectors – Return vectors attached to the Document.

  • limit – Limit the maximum number of Documents returned. If not set, 10 is used as the default limit.

Returns:

List of search results.

async remove(key: Any) Document | None[source]

Removes the mapping for a key from this VectorCollection if it is present (optional operation).

The VectorCollection will not contain a mapping for the specified key once the call returns.

Warning

This method uses __hash__ and __eq__ methods of binary form of the key, not the actual implementations of __hash__ and __eq__ defined in the key’s class.

Parameters:

key – Key of the mapping to be deleted.

Returns:

The Document associated with key, or None if there was no mapping for key.

async delete(key: Any) None[source]

Removes the mapping for a key from this VectorCollection if it is present (optional operation).

Unlike remove(object), this operation does not return the removed Document, which avoids the serialization cost of the returned Document. If the removed Document will not be used, a delete operation is preferred over a remove operation for better performance.

The VectorCollection will not contain a mapping for the specified key once the call returns.

Parameters:

key – Key of the mapping to be deleted.

async optimize(index_name: str = None) None[source]

Optimize index by fully removing nodes marked for deletion, trimming neighbor sets to the advertised degree, and updating the entry node as necessary.

Warning

This operation can take a long time to execute and consume a lot of server resources.

Parameters:

index_name – Name of the index to optimize. If not specified, the only index defined for the collection will be used. Must be specified if the collection has more than one index.

async clear() None[source]

Clears the VectorCollection.

async size() int[source]

Returns the number of Documents in this VectorCollection.

Returns:

Number of Documents in this VectorCollection.