Asyncio HazelcastClient API Documentation
- class AtomicLong(context, group_id, service_name, proxy_name, object_name)[source]
Bases:
BaseCPProxyAtomicLong is a redundant and highly available distributed counter for 64-bit integers (
longtype in Java).It works on top of the Raft consensus algorithm. It offers linearizability during crash failures and network partitions. It is CP with respect to the CAP principle. If a network partition occurs, it remains available on at most one side of the partition.
AtomicLong implementation does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures. It can be tuned to offer at-most-once execution semantics. Please see fail-on-indeterminate-operation-state server-side setting.
- async add_and_get(delta: int) int[source]
Atomically adds the given value to the current value.
- Parameters:
delta – The value to add to the current value.
- Returns:
The updated value, the given value added to the current value.
- async compare_and_set(expect: int, update: int) bool[source]
Atomically sets the value to the given updated value only if the current value equals the expected value.
- Parameters:
expect – The expected value.
update – The new value.
- Returns:
Trueif successful; orFalseif the actual value was not equal to the expected value.
- async decrement_and_get() int[source]
Atomically decrements the current value by one.
- Returns:
The updated value, the current value decremented by one.
- async get_and_decrement() int[source]
Atomically decrements the current value by one.
- Returns:
The old value.
- async get_and_add(delta: int) int[source]
Atomically adds the given value to the current value.
- Parameters:
delta – The value to add to the current value.
- Returns:
The old value before the add.
- async get_and_set(new_value: int) int[source]
Atomically sets the given value and returns the old value.
- Parameters:
new_value – The new value.
- Returns:
The old value.
- async increment_and_get() int[source]
Atomically increments the current value by one.
- Returns:
The updated value, the current value incremented by one.
- async get_and_increment() int[source]
Atomically increments the current value by one.
- Returns:
The old value.
- async set(new_value: int) None[source]
Atomically sets the given value.
- Parameters:
new_value – The new value
- async alter(function: Any) None[source]
Alters the currently stored value by applying a function on it.
Notes
functionmust be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements thecom.hazelcast.core.IFunctioninterface with the actual logic of the function to be applied.- Parameters:
function – The function that alters the currently stored value.
- async alter_and_get(function: Any) int[source]
Alters the currently stored value by applying a function on it and gets the result.
Notes
functionmust be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements thecom.hazelcast.core.IFunctioninterface with the actual logic of the function to be applied.- Parameters:
function – The function that alters the currently stored value.
- Returns:
The new value.
- async get_and_alter(function: Any) int[source]
Alters the currently stored value by applying a function on it and gets the old value.
Notes
functionmust be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements thecom.hazelcast.core.IFunctioninterface with the actual logic of the function to be applied.- Parameters:
function – The function that alters the currently stored value.
- Returns:
The old value.
- async apply(function: Any) Any[source]
Applies a function on the value, the actual stored value will not change.
Notes
functionmust be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements thecom.hazelcast.core.IFunctioninterface with the actual logic of the function to be applied.- Parameters:
function – The function applied to the currently stored value.
- Returns:
The result of the function application.
- class AtomicReference(context, group_id, service_name, proxy_name, object_name)[source]
Bases:
BaseCPProxy,Generic[ElementType]A distributed, highly available object reference with atomic operations.
AtomicReference offers linearizability during crash failures and network partitions. It is CP with respect to the CAP principle. If a network partition occurs, it remains available on at most one side of the partition.
The following are some considerations you need to know when you use AtomicReference:
AtomicReference works based on the byte-content and not on the object-reference. If you use the
compare_and_set()method, do not change the original value because its serialized content will then be different.All methods returning an object return a private copy. You can modify the private copy, but the rest of the world is shielded from your changes. If you want these changes to be visible to the rest of the world, you need to write the change back to the AtomicReference; but be careful about introducing a data-race.
The in-memory format of an AtomicReference is
binary. The receiving side does not need to have the class definition available unless it needs to be deserialized on the other side., e.g., because a method like alter() is executed. This deserialization is done for every call that needs to have the object instead of the binary content, so be careful with expensive object graphs that need to be deserialized.If you have an object with many fields or an object graph, and you only need to calculate some information or need a subset of fields, you can use the apply() method. With the apply() method, the whole object does not need to be sent over the line; only the information that is relevant is sent.
IAtomicReference does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures. It can be tuned to offer at-most-once execution semantics. Please see fail-on-indeterminate-operation-state server-side setting.
- async compare_and_set(expect: ElementType | None, update: ElementType | None) bool[source]
Atomically sets the value to the given updated value only if the current value is equal to the expected value.
- Parameters:
expect – The expected value.
update – The new value.
- Returns:
Trueif successful, orFalseif the actual value was not equal to the expected value.
- async set(new_value: ElementType | None) None[source]
Atomically sets the given value.
- Parameters:
new_value – The new value.
- async get_and_set(new_value: ElementType | None) ElementType | None[source]
Gets the old value and sets the new value.
- Parameters:
new_value – The new value.
- Returns:
The old value.
- async is_none() bool[source]
Checks if the stored reference is
None.- Returns:
Trueif the stored reference isNone,Falseotherwise.
- async contains(value: ElementType | None) bool[source]
Checks if the reference contains the value.
- Parameters:
value – The value to check (is allowed to be
None).- Returns:
Trueif the value is found,Falseotherwise.
- async alter(function: Any) None[source]
Alters the currently stored reference by applying a function on it.
Notes
functionmust be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements thecom.hazelcast.core.IFunctioninterface with the actual logic of the function to be applied.- Parameters:
function – The function that alters the currently stored reference.
- async alter_and_get(function: Any) ElementType | None[source]
Alters the currently stored reference by applying a function on it and gets the result.
Notes
functionmust be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements thecom.hazelcast.core.IFunctioninterface with the actual logic of the function to be applied.- Parameters:
function – The function that alters the currently stored reference.
- Returns:
The new value, the result of the applied function.
- async get_and_alter(function: Any) ElementType | None[source]
Alters the currently stored reference by applying a function on it on and gets the old value.
Notes
functionmust be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements thecom.hazelcast.core.IFunctioninterface with the actual logic of the function to be applied.- Parameters:
function – The function that alters the currently stored reference.
- Returns:
The old value, the value before the function is applied.
- async apply(function: Any) ElementType | None[source]
Applies a function on the value, the actual stored value will not change.
Notes
functionmust be an instance of Hazelcast serializable type. It must have a counterpart registered in the server-side that implements thecom.hazelcast.core.IFunctioninterface with the actual logic of the function to be applied.- Parameters:
function – The function applied on the currently stored reference.
- Returns:
The result of the function application.
- class CPSubsystem(context)[source]
Bases:
objectCP Subsystem is a component of Hazelcast that builds a strongly consistent layer for a set of distributed data structures.
Its APIs can be used for implementing distributed coordination use cases, such as leader election, distributed locking, synchronization, and metadata management.
Its data structures are CP with respect to the CAP principle, i.e., they always maintain linearizability and prefer consistency to availability during network partitions. Besides network partitions, CP Subsystem withstands server and client failures.
Data structures in CP Subsystem run in CP groups. Each CP group elects its own Raft leader and runs the Raft consensus algorithm independently.
The CP data structures differ from the other Hazelcast data structures in two aspects. First, an internal commit is performed on the METADATA CP group every time you fetch a proxy from this interface. Hence, callers should cache returned proxy objects. Second, if you call
destroy()on a CP data structure proxy, that data structure is terminated on the underlying CP group and cannot be reinitialized until the CP group is force-destroyed. For this reason, please make sure that you are completely done with a CP data structure before destroying its proxy.- async get_atomic_long(name: str) AtomicLong[source]
Returns the distributed AtomicLong instance with given name.
The instance is created on CP Subsystem.
If no group name is given within the
nameargument, then the AtomicLong instance will be created on the default CP group. If a group name is given, like.get_atomic_long("myLong@group1"), the given group will be initialized first, if not initialized already, and then the instance will be created on this group.- Parameters:
name – Name of the AtomicLong.
- Returns:
The AtomicLong proxy for the given name.
- async get_atomic_reference(name: str) AtomicReference[source]
Returns the distributed AtomicReference instance with given name.
The instance is created on CP Subsystem.
If no group name is given within the
nameargument, then the AtomicReference instance will be created on the DEFAULT CP group. If a group name is given, like.get_atomic_reference("myRef@group1"), the given group will be initialized first, if not initialized already, and then the instance will be created on this group.- Parameters:
name – Name of the AtomicReference.
- Returns:
The AtomicReference proxy for the given name.
- async get_count_down_latch(name: str) CountDownLatch[source]
Returns the distributed CountDownLatch instance with given name.
The instance is created on CP Subsystem.
If no group name is given within the
nameargument, then the CountDownLatch instance will be created on the DEFAULT CP group. If a group name is given, like.get_count_down_latch("myLatch@group1"), the given group will be initialized first, if not initialized already, and then the instance will be created on this group.- Parameters:
name – Name of the CountDownLatch.
- Returns:
The CountDownLatch proxy for the given name.
- async get_semaphore(name: str) Semaphore[source]
Returns the distributed Semaphore instance with given name.
The instance is created on CP Subsystem.
If no group name is given within the
nameargument, then the Semaphore instance will be created on the DEFAULT CP group. If a group name is given, like.get_semaphore("mySemaphore@group1"), the given group will be initialized first, if not initialized already, and then the instance will be created on this group.- Parameters:
name – Name of the Semaphore
- Returns:
The Semaphore proxy for the given name.
- class CountDownLatch(context, group_id, service_name, proxy_name, object_name)[source]
Bases:
BaseCPProxyA distributed, concurrent countdown latch data structure.
CountDownLatch is a cluster-wide synchronization aid that allows one or more callers to wait until a set of operations being performed in other callers completes.
CountDownLatch count can be reset using
try_set_count()method after a countdown has finished but not during an active count. This allows the same latch instance to be reused.There is no
await_latch()method to wait indefinitely since this is undesirable in a distributed application: for example, a cluster can split or the master and replicas could all terminate. In most cases, it is best to configure an explicit timeout, so you have the ability to deal with these situations.All the API methods in the CountDownLatch offer the exactly-once execution semantics. For instance, even if a
count_down()call is internally retried because of crashed Hazelcast member, the counter value is decremented only once.- async await_latch(timeout: float) bool[source]
Causes the current thread to wait until the latch has counted down to zero, or an exception is thrown, or the specified waiting time elapses.
If the current count is zero then this method returns
True.If the current count is greater than zero, then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of the following things happen:
The count reaches zero due to invocations of the
count_down()methodThis CountDownLatch instance is destroyed
The countdown owner becomes disconnected
The specified waiting time elapses
If the count reaches zero, then the method returns with the value
True.If the specified waiting time elapses then the value
Falseis returned. If the time is less than or equal to zero, the method will not wait at all.- Parameters:
timeout – The maximum time to wait in seconds
- Returns:
Trueif the count reached zero,Falseif the waiting time elapsed before the count reached zero- Raises:
IllegalStateError – If the Hazelcast instance was shut down while waiting.
- async count_down() None[source]
Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
If the current count is greater than zero, then it is decremented. If the new count is zero:
All waiting threads are re-enabled for thread scheduling purposes
Countdown owner is set to
None.
If the current count equals zero, then nothing happens.
- async try_set_count(count: int) bool[source]
Sets the count to the given value if the current count is zero.
If count is not zero, then this method does nothing and returns
False.- Parameters:
count – The number of times
count_down()must be invoked before callers can pass throughawait_latch().- Returns:
Trueif the new count was set,Falseif the current count is not zero.
- class FlakeIdGenerator(service_name, name, context)[source]
Bases:
ProxyA cluster-wide unique ID generator. Generated IDs are int values and are k-ordered (roughly ordered). IDs are in the range from 0 to 2^63 - 1.
The IDs contain a timestamp component and a node ID component, which is assigned when the member joins the cluster. This allows the IDs to be ordered and unique without any coordination between members, which makes the generator safe even in split-brain scenario.
Timestamp component is in milliseconds since 1.1.2018, 0:00 UTC and has 41 bits. This caps the useful lifespan of the generator to little less than 70 years (until ~2088). The sequence component is 6 bits. If more than 64 IDs are requested in single millisecond, IDs will gracefully overflow to the next millisecond and uniqueness is guaranteed in this case. The implementation does not allow overflowing by more than 15 seconds, if IDs are requested at higher rate, the call will block. Note, however, that clients are able to generate even faster because each call goes to a different (random) member and the 64 IDs/ms limit is for single member.
- Node ID overflow:
It is possible to generate IDs on any member or client as long as there is at least one member with join version smaller than 2^16 in the cluster. The remedy is to restart the cluster: nodeId will be assigned from zero again. Uniqueness after the restart will be preserved thanks to the timestamp component.
Warning
Asyncio client FlakeIdGenerator proxy is not thread-safe, do not access it from other threads.
- async new_id() int[source]
Generates and returns a cluster-wide unique ID.
This method goes to a random member and gets a batch of IDs, which will then be returned locally for a limited time. The pre-fetch size and the validity time can be configured.
Note
Values returned from this method may not be strictly ordered.
- Returns:
New cluster-wide unique ID.
- Raises:
HazelcastError – If node ID for all members in the cluster is out of valid range. See
Node ID overflownote above.
- class Executor(service_name: str, name: str, context)[source]
Bases:
ProxyAn object that executes submitted executable tasks.
- async execute_on_key_owner(key: Any, task: Any) Any[source]
Executes a task on the owner of the specified key.
- Parameters:
key – The specified key.
task – A task executed on the owner of the specified key.
- Returns:
The result of the task.
- async execute_on_member(member: MemberInfo, task: Any) Any[source]
Executes a task on the specified member.
- Parameters:
member – The specified member.
task – The task executed on the specified member.
- Returns:
The result of the task.
- async execute_on_members(members: Sequence[MemberInfo], task: Any) List[Any][source]
Executes a task on each of the specified members.
- Parameters:
members – The specified members.
task – The task executed on the specified members.
- Returns:
The list of results of the tasks on each member.
- async execute_on_all_members(task: Any) List[Any][source]
Executes a task on all the known cluster members.
- Parameters:
task – The task executed on the all the members.
- Returns:
The list of results of the tasks on each member.
- class HazelcastClient(config: Config | None = None, **kwargs)[source]
Bases:
objectHazelcast client instance to access and manipulate distributed data structures on the Hazelcast clusters.
The client can be configured either by:
providing a configuration object as the first parameter of the constructor
from hazelcast.asyncio import HazelcastClient from hazelcast.config import Config config = Config() config.cluster_name = "a-cluster" client = await HazelcastClient.create_and_start(config)
passing configuration options as keyword arguments
from hazelcast.asyncio import HazelcastClient client = await HazelcastClient.create_and_start( cluster_name="a-cluster", )
Warning
Asyncio client is not thread-safe, do not access it from other threads.
See the
hazelcast.config.Configdocumentation for the possible configuration options.Creates a HazelcastClient instance.
This call just creates the instance, without starting it.
The preferred way of creating and starting the client instance is using the
create_and_startmethod:from hazelcast.asyncio import HazelcastClient client = await HazelcastClient.create_and_start()
See the
hazelcast.config.Configdocumentation for the possible configuration options.- Parameters:
config – Optional configuration object.
**kwargs – Optional keyword arguments of the client configuration.
- async classmethod create_and_start(config: Config | None = None, **kwargs) HazelcastClient[source]
Creates a HazelcastClient instance, and starts it.
from hazelcast.asyncio import HazelcastClient client = await HazelcastClient.create_and_start()
See the
hazelcast.config.Configdocumentation for the possible configuration options.- Parameters:
config – Optional configuration object.
**kwargs – Optional keyword arguments of the client configuration.
- async get_executor(name: str) Executor[source]
Returns the executor instance with the specified name.
- Parameters:
name – Name of the executor.
- Returns:
Executor instance with the specified name.
- async get_list(name: str) List[KeyType][source]
Returns the distributed list instance with the specified name.
- Parameters:
name – Name of the distributed list.
- Returns:
Distributed list instance with the specified name.
- async get_map(name: str) Map[KeyType, ValueType][source]
Returns the distributed map instance with the specified name.
- Parameters:
name – Name of the distributed map.
- Returns:
Distributed map instance with the specified name.
- async get_multi_map(name: str) MultiMap[KeyType, ValueType][source]
Returns the distributed MultiMap instance with the specified name.
- Parameters:
name – Name of the distributed MultiMap.
- Returns:
Distributed MultiMap instance with the specified name.
- async get_queue(name: str) Queue[KeyType][source]
Returns the distributed queue instance with the specified name.
- Parameters:
name – Name of the distributed queue.
- Returns:
Distributed queue instance with the specified name.
- async get_set(name: str) Set[KeyType][source]
Returns the distributed set instance with the specified name.
- Parameters:
name – Name of the distributed set.
- Returns:
Distributed set instance with the specified name.
- async get_replicated_map(name: str) ReplicatedMap[KeyType, ValueType][source]
Returns the distributed ReplicatedMap instance with the specified name.
- Parameters:
name – Name of the distributed replicated map.
- Returns:
Distributed ReplicatedMap instance with the specified name.
- async get_flake_id_generator(name: str) FlakeIdGenerator[source]
Returns the FlakeIdGenerator instance with the specified name.
- Parameters:
name – Name of the FlakeIdGenerator.
- Returns:
FlakeIdGenerator instance with the specified name.
- async get_reliable_topic(name: str) ReliableTopic[source]
Returns the ReliableTopic instance with the specified name.
- Parameters:
name – Name of the ReliableTopic.
- Returns:
Distributed ReliableTopic instance with the specified name.
- async get_ringbuffer(name: str) Ringbuffer[ItemType][source]
Returns the distributed Ringbuffer instance with the specified name.
- Parameters:
name – Name of the distributed ringbuffer.
- Returns:
Distributed Ringbuffer instance with the specified name.
- async get_pn_counter(name: str) PNCounter[source]
Returns the PN Counter instance with the specified name.
- Parameters:
name – Name of the PN Counter.
- Returns:
Distributed PN Counter instance with the specified name.
- async get_topic(name: str) Topic[MessageType][source]
Returns the distributed topic instance with the specified name.
- Parameters:
name – Name of the distributed topic.
- Returns:
Distributed topic instance with the specified name.
- async create_vector_collection_config(name: str, indexes: List[IndexConfig], backup_count: int = 1, async_backup_count: int = 0, split_brain_protection_name: str | None = None, merge_policy: str = 'PutIfAbsentMergePolicy', merge_batch_size: int = 100) None[source]
Creates a vector collection with the given configuration.
- Parameters:
name – Name of the distributed map.
indexes – One or more index configurations. The index names must be unique.
backup_count – Number of backups to keep for the vector collection.
split_brain_protection_name – Name of the split brain protection configuration. See https://docs.hazelcast.com/hazelcast/5.6/data-structures/vector-collections#split-brain-protection
merge_policy – The merge policy to use while recovering in a split brain situation. See https://docs.hazelcast.com/hazelcast/5.6/data-structures/vector-collections#merge-policy
- async get_vector_collection(name: str) VectorCollection[source]
Returns the vector collection instance with the specified name.
- Parameters:
name – Name of the vector collection.
- Returns:
Vector collection instance with the specified name.
- async add_distributed_object_listener(listener_func: Callable[[DistributedObjectEvent], None]) str[source]
Adds a listener which will be notified when a new distributed object is created or destroyed.
- Parameters:
listener_func – Function to be called when a distributed object is created or destroyed.
- Returns:
A registration id which is used as a key to remove the listener.
- async remove_distributed_object_listener(registration_id: str) bool[source]
Removes the specified distributed object listener.
Returns silently if there is no such listener added before.
- Parameters:
registration_id – The id of the registered listener.
- Returns:
Trueif registration is removed,Falseotherwise.
- property name: str
Name of the client.
- property lifecycle_service: LifecycleService
Lifecycle service allows you to check if the client is running and add and remove lifecycle listeners.
- property partition_service: PartitionService
Partition service allows you to get partition count, introspect the partition owners, and partition ids of keys.
- property cluster_service: ClusterService
Cluster service allows you to get the list of the cluster members and add and remove membership listeners.
- Type:
- property sql: SqlService
Returns a service to execute distributed SQL queries.
- property cp_subsystem: CPSubsystem
CP Subsystem offers set of in-memory linearizable data structures.
- class List(service_name, name, context)[source]
Bases:
PartitionSpecificProxy,Generic[ItemType]Concurrent, distributed implementation of List.
The Hazelcast List is not a partitioned data-structure. So all the content of the List is stored in a single machine (and in the backup). So the List will not scale by adding more members in the cluster.
Example
>>> my_list = await client.get_list("my_list") >>> print("list.add", await my_list.add("item")) >>> print("list.size", await my_list.size())
Warning
Asyncio client list proxy is not thread-safe, do not access it from other threads.
- async add(item: ItemType) bool[source]
Adds the specified item to the end of this list.
- Parameters:
item – the specified item to be appended to this list.
- Returns:
Trueif item is added,Falseotherwise.
- async add_at(index: int, item: ItemType) None[source]
Adds the specified item at the specific position in this list. Element in this position and following elements are shifted to the right, if any.
- Parameters:
index – The specified index to insert the item.
item – The specified item to be inserted.
- async add_all(items: Sequence[ItemType]) bool[source]
Adds all of the items in the specified collection to the end of this list.
The order of new elements is determined by the specified collection’s iterator.
- Parameters:
items – The specified collection which includes the elements to be added to list.
- Returns:
Trueif this call changed the list,Falseotherwise.
- async add_all_at(index: int, items: Sequence[ItemType]) bool[source]
Adds all of the elements in the specified collection into this list at the specified position.
Elements in this positions and following elements are shifted to the right, if any. The order of new elements is determined by the specified collection’s iterator.
- Parameters:
index – The specified index at which the first element of specified collection is added.
items – The specified collection which includes the elements to be added to list.
- Returns:
Trueif this call changed the list,Falseotherwise.
- async add_listener(include_value: bool = False, item_added_func: Callable[[ItemEvent[ItemType]], None] = None, item_removed_func: Callable[[ItemEvent[ItemType]], None] = None) str[source]
Adds an item listener for this list. Listener will be notified for all list add/remove events.
- Parameters:
include_value – Whether received events include the updated item or not.
item_added_func – To be called when an item is added to this list.
item_removed_func – To be called when an item is deleted from this list.
- Returns:
A registration id which is used as a key to remove the listener.
- async contains(item: ItemType) bool[source]
Determines whether this list contains the specified item or not.
- Parameters:
item – The specified item.
- Returns:
Trueif the specified item exists in this list,Falseotherwise.
- async contains_all(items: Sequence[ItemType]) bool[source]
Determines whether this list contains all of the items in specified collection or not.
- Parameters:
items – The specified collection which includes the items to be searched.
- Returns:
Trueif all of the items in specified collection exist in this list,Falseotherwise.
- async get(index: int) ItemType[source]
Returns the item which is in the specified position in this list.
- Parameters:
index – the specified index of the item to be returned.
- Returns:
The item in the specified position in this list.
- async get_all() List[ItemType][source]
Returns all the items in this list.
- Returns:
All the items in this list.
- async iterator() List[ItemType][source]
Returns an iterator over the elements in this list in proper sequence, same with
get_all.- Returns:
All the items in this list.
- async index_of(item: ItemType) int[source]
Returns the first index of specified item’s occurrences in this list.
If specified item is not present in this list, returns -1.
- Parameters:
item – The specified item to be searched for.
- Returns:
The first index of specified item’s occurrences,
-1if item is not present in this list.
- async is_empty() bool[source]
Determines whether this list is empty or not.
- Returns:
Trueif the list contains no elements,Falseotherwise.
- async last_index_of(item: ItemType) int[source]
Returns the last index of specified item’s occurrences in this list.
If specified item is not present in this list, returns -1.
- Parameters:
item – The specified item to be searched for.
- Returns:
The last index of specified item’s occurrences,
-1if item is not present in this list.
- async list_iterator(index: int = 0) List[ItemType][source]
Returns a list iterator of the elements in this list.
If an index is provided, iterator starts from this index.
- Parameters:
index – Index of first element to be returned from the list iterator.
- Returns:
List of the elements in this list.
- async remove(item: ItemType) bool[source]
Removes the specified element’s first occurrence from the list if it exists in this list.
- Parameters:
item – The specified element.
- Returns:
Trueif the specified element is present in this list,Falseotherwise.
- async remove_at(index: int) ItemType[source]
Removes the item at the specified position in this list.
Element in this position and following elements are shifted to the left, if any.
- Parameters:
index – Index of the item to be removed.
- Returns:
The item previously at the specified index.
- async remove_all(items: Sequence[ItemType]) bool[source]
Removes all of the elements that is present in the specified collection from this list.
- Parameters:
items – The specified collection.
- Returns:
Trueif this list changed as a result of the call,Falseotherwise.
- async remove_listener(registration_id: str) bool[source]
Removes the specified item listener.
Returns silently if the specified listener was not added before.
- Parameters:
registration_id – Id of the listener to be deleted.
- Returns:
Trueif the item listener is removed,Falseotherwise.
- async retain_all(items: Sequence[ItemType]) bool[source]
Retains only the items that are contained in the specified collection.
It means, items which are not present in the specified collection are removed from this list.
- Parameters:
items – Collections which includes the elements to be retained in this list.
- Returns:
Trueif this list changed as a result of the call,Falseotherwise.
- async size() int[source]
Returns the number of elements in this list.
- Returns:
Number of elements in this list.
- async set_at(index: int, item: ItemType) ItemType[source]
Replaces the specified element with the element at the specified position in this list.
- Parameters:
index – Index of the item to be replaced.
item – Item to be stored.
- Returns:
The previous item in the specified index.
- async sub_list(from_index: int, to_index: int) List[ItemType][source]
Returns a sublist from this list, from from_index(inclusive) to to_index(exclusive).
The returned list is backed by this list, so non-structural changes in the returned list are reflected in this list, and vice-versa.
- Parameters:
from_index – The start point(inclusive) of the sub_list.
to_index – The end point(exclusive) of the sub_list.
- Returns:
A view of the specified range within this list.
- class Map(service_name, name, context)[source]
Bases:
Proxy,Generic[KeyType,ValueType]Hazelcast Map client proxy to access the map on the cluster.
Concurrent, distributed, observable and queryable map.
Example
>>> my_map = await client.get_map("my_map") >>> print("map.put", await my_map.put("key", "value")) >>> print("map.contains_key", await my_map.contains_key("key")) >>> print("map.get", await my_map.get("key")) >>> print("map.size", await my_map.size())
This class does not allow
Noneto be used as a key or value.Warning
Asyncio client map proxy is not thread-safe, do not access it from other threads.
- async add_entry_listener(include_value: bool = False, key: KeyType = None, predicate: Predicate = None, added_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, removed_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, updated_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, evicted_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, evict_all_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, clear_all_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, merged_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, expired_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, loaded_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None) str[source]
Adds a continuous entry listener for this map.
Listener will get notified for map events filtered with given parameters.
The listener functions must not block.
- Parameters:
include_value – Whether received event should include the value or not.
key – Key for filtering the events.
predicate – Predicate for filtering the events.
added_func – Function to be called when an entry is added to map.
removed_func – Function to be called when an entry is removed from map.
updated_func – Function to be called when an entry is updated.
evicted_func – Function to be called when an entry is evicted from map.
evict_all_func – Function to be called when entries are evicted from map.
clear_all_func – Function to be called when entries are cleared from map.
merged_func – Function to be called when WAN replicated entry is merged.
expired_func – Function to be called when an entry’s live time is expired.
loaded_func – Function to be called when an entry is loaded from a map loader.
- Returns:
A registration id which is used as a key to remove the listener.
- async add_index(attributes: Sequence[str] = None, index_type: int | str = 0, name: str = None, bitmap_index_options: Dict[str, Any] = None) None[source]
Adds an index to this map for the specified entries so that queries can run faster.
Example
Let’s say your map values are Employee objects.
>>> class Employee(IdentifiedDataSerializable): >>> active = false >>> age = None >>> name = None >>> #other fields >>> >>> #methods
If you query your values mostly based on age and active fields, you should consider indexing these.
>>> employees = await client.get_map("employees") >>> await employees.add_index(attributes=["age"]) # Sorted index for range queries >>> await employees.add_index(attributes=["active"], index_type=IndexType.HASH)) # Hash index for equality predicates
Index attribute should either have a getter method or be public. You should also make sure to add the indexes before adding entries to this map.
Indexing time is executed in parallel on each partition by operation threads. The Map is not blocked during this operation. The time taken in proportional to the size of the Map and the number Members.
Until the index finishes being created, any searches for the attribute will use a full Map scan, thus avoiding using a partially built index and returning incorrect results.
- Parameters:
attributes – List of indexed attributes.
index_type – Type of the index. By default, set to
SORTED.name – Name of the index.
bitmap_index_options –
Bitmap index options.
unique_key: (str): The unique key attribute is used as a source of values which uniquely identify each entry being inserted into an index. Defaults to
KEY_ATTRIBUTE_NAME. See thehazelcast.config.QueryConstantsfor possible values.unique_key_transformation (int|str): The transformation is applied to every value extracted from the unique key attribue. Defaults to
OBJECT. See thehazelcast.config.UniqueKeyTransformationfor possible values.
- async add_interceptor(interceptor: Any) str[source]
Adds an interceptor for this map.
Added interceptor will intercept operations and execute user defined methods.
- Parameters:
interceptor – Interceptor for the map which includes user defined methods.
- Returns:
Id of registered interceptor.
- async aggregate(aggregator: Aggregator[AggregatorResultType], predicate: Predicate = None) AggregatorResultType[source]
Applies the aggregation logic on map entries and filter the result with the predicate, if given.
- Parameters:
aggregator – Aggregator to aggregate the entries with.
predicate – Predicate to filter the entries with.
- Returns:
The result of the aggregation.
- async clear() None[source]
Clears the map.
The
MAP_CLEAREDevent is fired for any registered listeners.
- async contains_key(key: KeyType) bool[source]
Determines whether this map contains an entry with the key.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – The specified key.
- Returns:
Trueif this map contains an entry for the specified key,Falseotherwise.
- async contains_value(value: ValueType) bool[source]
Determines whether this map contains one or more keys for the specified value.
- Parameters:
value – The specified value.
- Returns:
Trueif this map contains an entry for the specified value,Falseotherwise.
- async delete(key: KeyType) None[source]
Removes the mapping for a key from this map if it is present (optional operation).
Unlike remove(object), this operation does not return the removed value, which avoids the serialization cost of the returned value. If the removed value will not be used, a delete operation is preferred over a remove operation for better performance.
The map will not contain a mapping for the specified key once the call returns.
Warning
This method breaks the contract of EntryListener. When an entry is removed by delete(), it fires an
EntryEventwith aNoneold_value. Also, a listener with predicates will haveNonevalues, so only the keys can be queried via predicates.- Parameters:
key – Key of the mapping to be deleted.
- async entry_set(predicate: Predicate = None) List[Tuple[KeyType, ValueType]][source]
Returns a list clone of the mappings contained in this map.
Warning
The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.
- Parameters:
predicate – Predicate for the map to filter entries.
- Returns:
The list of key-value tuples in the map.
- async evict(key: KeyType) bool[source]
Evicts the specified key from this map.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – Key to evict.
- Returns:
Trueif the key is evicted,Falseotherwise.
- async evict_all() None[source]
Evicts all keys from this map except the locked ones.
The
EVICT_ALLevent is fired for any registered listeners.
- async execute_on_entries(entry_processor: Any, predicate: Predicate | None = None) List[Any][source]
Applies the user defined EntryProcessor to all the entries in the map or entries in the map which satisfies the predicate if provided. Returns the results mapped by each key in the map.
- Parameters:
entry_processor – A stateful serializable object which represents the EntryProcessor defined on server side. This object must have a serializable EntryProcessor counter part registered on server side with the actual
com.hazelcast.map.EntryProcessorimplementation.predicate – Predicate for filtering the entries.
- Returns:
List of map entries which includes the keys and the results of the entry process.
- async execute_on_key(key: KeyType, entry_processor: Any) Any[source]
Applies the user defined EntryProcessor to the entry mapped by the key. Returns the object which is the result of EntryProcessor’s process method.
- Parameters:
key – Specified key for the entry to be processed.
entry_processor – A stateful serializable object which represents the EntryProcessor defined on server side. This object must have a serializable EntryProcessor counter part registered on server side with the actual
com.hazelcast.map.EntryProcessorimplementation.
- Returns:
Result of entry process.
- async execute_on_keys(keys: Sequence[KeyType], entry_processor: Any) List[Any][source]
Applies the user defined EntryProcessor to the entries mapped by the collection of keys. Returns the results mapped by each key in the collection.
- Parameters:
keys – Collection of the keys for the entries to be processed.
entry_processor – A stateful serializable object which represents the EntryProcessor defined on server side. This object must have a serializable EntryProcessor counter part registered on server side with the actual
com.hazelcast.map.EntryProcessorimplementation.
- Returns:
List of map entries which includes the keys and the results of the entry process.
- async get(key: KeyType) ValueType | None[source]
Returns the value for the specified key, or
Noneif this map does not contain this key.Warning
This method returns a clone of original value, modifying the returned value does not change the actual value in the map. One should put modified value back to make changes visible to all nodes.
>>> value = await my_map.get(key) >>> value.update_some_property() >>> await my_map.put(key,value)
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – The specified key.
- Returns:
The value for the specified key.
- async get_all(keys: Sequence[KeyType]) Dict[KeyType, ValueType][source]
Returns the entries for the given keys.
Warning
The returned map is NOT backed by the original map, so changes to the original map are NOT reflected in the returned map, and vice-versa.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
keys – Keys to get.
- Returns:
Dictionary of map entries.
- async get_entry_view(key: KeyType) SimpleEntryView[KeyType, ValueType][source]
Returns the EntryView for the specified key.
Warning
This method returns a clone of original mapping, modifying the returned value does not change the actual value in the map. One should put modified value back to make changes visible to all nodes.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – The key of the entry.
- Returns:
EntryView of the specified key.
- async is_empty() bool[source]
Returns whether this map contains no key-value mappings or not.
- Returns:
Trueif this map contains no key-value mappings,Falseotherwise.
- async key_set(predicate: Predicate | None = None) List[ValueType][source]
Returns a List clone of the keys contained in this map or the keys of the entries filtered with the predicate if provided.
Warning
The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.
- Parameters:
predicate – Predicate to filter the entries.
- Returns:
A list of the clone of the keys.
- async load_all(keys: Sequence[KeyType] = None, replace_existing_values: bool = True) None[source]
Loads all keys from the store at server side or loads the given keys if provided.
- Parameters:
keys – Keys of the entry values to load.
replace_existing_values – Whether the existing values will be replaced or not with those loaded from the server side MapLoader.
- async project(projection: Projection[ProjectionType], predicate: Predicate = None) ProjectionType[source]
Applies the projection logic on map entries and filter the result with the predicate, if given.
- Parameters:
projection – Projection to project the entries with.
predicate – Predicate to filter the entries with.
- Returns:
The result of the projection.
- async put(key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None) ValueType | None[source]
Associates the specified value with the specified key in this map.
If the map previously contained a mapping for the key, the old value is replaced by the specified value. If ttl is provided, entry will expire and get evicted after the ttl.
Warning
This method returns a clone of the previous value, not the original (identically equal) value previously put into the map.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – The specified key.
value – The value to associate with the key.
ttl – Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to
0means infinite time-to-live.max_idle – Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to
0means infinite max idle time.
- Returns:
Previous value associated with key or
Noneif there was no mapping for key.
- async put_all(map: Dict[KeyType, ValueType]) None[source]
Copies all the mappings from the specified map to this map.
No atomicity guarantees are given. In the case of a failure, some key-value tuples may get written, while others are not.
- Parameters:
map – Dictionary which includes mappings to be stored in this map.
- async put_if_absent(key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None) ValueType | None[source]
Associates the specified key with the given value if it is not already associated.
If ttl is provided, entry will expire and get evicted after the ttl.
This is equivalent to below, except that the action is performed atomically:
>>> if not (await my_map.contains_key(key)): >>> return await my_map.put(key,value) >>> else: >>> return await my_map.get(key)
Warning
This method returns a clone of the previous value, not the original (identically equal) value previously put into the map.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – Key of the entry.
value – Value of the entry.
ttl – Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to
0means infinite time-to-live.max_idle – Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to
0means infinite max idle time.
- Returns:
Old value of the entry.
- async put_transient(key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None) None[source]
Same as
put, but MapStore defined at the server side will not be called.Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – Key of the entry.
value – Value of the entry.
ttl – Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to
0means infinite time-to-live.max_idle – Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to
0means infinite max idle time.
- async remove(key: KeyType) ValueType | None[source]
Removes the mapping for a key from this map if it is present.
The map will not contain a mapping for the specified key once the call returns.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – Key of the mapping to be deleted.
- Returns:
The previous value associated with key, or
Noneif there was no mapping for key.
- async remove_all(predicate: Predicate) None[source]
Removes all entries which match with the supplied predicate.
- Parameters:
predicate – Used to select entries to be removed from map.
- async remove_if_same(key: KeyType, value: ValueType) bool[source]
Removes the entry for a key only if it is currently mapped to a given value.
This is equivalent to below, except that the action is performed atomically:
>>> if (await my_map.contains_key(key)) and (await my_map.get(key) == value): >>> await my_map.remove(key) >>> return True >>> else: >>> return False
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – The specified key.
value – Remove the key if it has this value.
- Returns:
Trueif the value was removed,Falseotherwise.
- async remove_entry_listener(registration_id: str) bool[source]
Removes the specified entry listener.
Returns silently if there is no such listener added before.
- Parameters:
registration_id – Id of registered listener.
- Returns:
Trueif registration is removed,Falseotherwise.
- async remove_interceptor(registration_id: str) bool[source]
Removes the given interceptor for this map, so it will not intercept operations anymore.
- Parameters:
registration_id – Registration ID of the map interceptor.
- Returns:
Trueif the interceptor is removed,Falseotherwise.
- async replace(key: KeyType, value: ValueType) ValueType | None[source]
Replaces the entry for a key only if it is currently mapped to some value.
This is equivalent to below, except that the action is performed atomically:
>>> if await my_map.contains_key(key): >>> return await my_map.put(key,value) >>> else: >>> return None
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.Warning
This method returns a clone of the previous value, not the original (identically equal) value previously put into the map.
- Parameters:
key – The specified key.
value – The value to replace the previous value.
- Returns:
Previous value associated with key, or
Noneif there was no mapping for key.
- async replace_if_same(key: ValueType, old_value: ValueType, new_value: ValueType) bool[source]
Replaces the entry for a key only if it is currently mapped to a given value.
This is equivalent to below, except that the action is performed atomically:
>>> if (await my_map.contains_key(key)) and (await my_map.get(key) == old_value): >>> await my_map.put(key, new_value) >>> return True >>> else: >>> return False
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – The specified key.
old_value – Replace the key value if it is the old value.
new_value – The new value to replace the old value.
- Returns:
Trueif the value was replaced,Falseotherwise.
- async set(key: KeyType, value: ValueType, ttl: float = None, max_idle: float = None) None[source]
Puts an entry into this map.
Similar to the put operation except that set doesn’t return the old value, which is more efficient. If ttl is provided, entry will expire and get evicted after the ttl.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – Key of the entry.
value – Value of the entry.
ttl – Maximum time in seconds for this entry to stay in the map. If not provided, the value configured on the server side configuration will be used. Setting this to
0means infinite time-to-live.max_idle – Maximum time in seconds for this entry to stay idle in the map. If not provided, the value configured on the server side configuration will be used. Setting this to
0means infinite max idle time.
- async set_ttl(key: KeyType, ttl: float) None[source]
Updates the TTL (time to live) value of the entry specified by the given key with a new TTL value.
New TTL value is valid starting from the time this operation is invoked, not since the time the entry was created. If the entry does not exist or is already expired, this call has no effect.
- Parameters:
key – The key of the map entry.
ttl – Maximum time in seconds for this entry to stay in the map. Setting this to
0means infinite time-to-live.
- async size() int[source]
Returns the number of entries in this map.
- Returns:
Number of entries in this map.
- async try_put(key: KeyType, value: ValueType, timeout: float = 0) bool[source]
Tries to put the given key and value into this map and returns immediately if timeout is not provided.
If timeout is provided, operation waits until it is completed or timeout is reached.
- Parameters:
key – Key of the entry.
value – Value of the entry.
timeout – Maximum time in seconds to wait.
- Returns:
Trueif the put is successful,Falseotherwise.
- async try_remove(key: KeyType, timeout: float = 0) bool[source]
Tries to remove the given key from this map and returns immediately if timeout is not provided.
If timeout is provided, operation waits until it is completed or timeout is reached.
- Parameters:
key – Key of the entry to be deleted.
timeout – Maximum time in seconds to wait.
- Returns:
Trueif the remove is successful,Falseotherwise.
- async values(predicate: Predicate = None) List[ValueType][source]
Returns a list clone of the values contained in this map or values of the entries which are filtered with the predicate if provided.
Warning
The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.
- Parameters:
predicate – Predicate to filter the entries.
- Returns:
A list of clone of the values contained in this map.
- class ReliableMessageListener[source]
Bases:
Generic[MessageType]A message listener for
ReliableTopic.A message listener will not be called concurrently (provided that it’s not registered twice). So there is no need to synchronize access to the state it reads or writes.
If a regular function is registered on a reliable topic, the message listener works fine, but it can’t do much more than listen to messages.
This is an enhanced version of that to better integrate with the reliable topic.
Durable Subscription
The ReliableMessageListener allows you to control where you want to start processing a message when the listener is registered. This makes it possible to create a durable subscription by storing the sequence of the last message and using this as the sequence id to start from.
Error handling
The ReliableMessageListener also gives the ability to deal with errors using the
is_terminal()method. If a plain function is used, then it won’t terminate on errors and it will keep on running. But in some cases it is better to stop running.Global order
The ReliableMessageListener will always get all events in order (global order). It will not get duplicates and there will only be gaps if it is too slow. For more information see
is_loss_tolerant().Delivery guarantees
Because the ReliableMessageListener controls which item it wants to continue from upon restart, it is very easy to provide an at-least-once or at-most-once delivery guarantee. The
store_sequence()is always called before a message is processed; so it can be persisted on some non-volatile storage. When theretrieve_initial_sequence()returns the stored sequence, then an at-least-once delivery is implemented since the same item is now being processed twice. To implement an at-most-once delivery guarantee, add 1 to the stored sequence when theretrieve_initial_sequence()is called.- on_message(message: TopicMessage[MessageType]) None[source]
Invoked when a message is received for the added reliable topic.
One should not block in this callback. If blocking is necessary, consider delegating that task to an executor or a thread pool.
- Parameters:
message – The message that is received for the topic
- retrieve_initial_sequence() int[source]
Retrieves the initial sequence from which this ReliableMessageListener should start.
Return
-1if there is no initial sequence and you want to start from the next published message.If you intend to create a durable subscriber so you continue from where you stopped the previous time, load the previous sequence and add
1. If you don’t add one, then you will be receiving the same message twice.- Returns:
The initial sequence.
- store_sequence(sequence: int) None[source]
Informs the ReliableMessageListener that it should store the sequence. This method is called before the message is processed. Can be used to make a durable subscription.
- Parameters:
sequence – The sequence.
- is_loss_tolerant() bool[source]
Checks if this ReliableMessageListener is able to deal with message loss. Even though the reliable topic promises to be reliable, it can be that a ReliableMessageListener is too slow. Eventually the message won’t be available anymore.
If the ReliableMessageListener is not loss tolerant and the topic detects that there are missing messages, it will terminate the ReliableMessageListener.
- Returns:
Trueif the ReliableMessageListener is tolerant towards losing messages.
- is_terminal(error: Exception) bool[source]
Checks if the ReliableMessageListener should be terminated based on an error raised while calling
on_message().- Parameters:
error – The error raised while calling
on_message()- Returns:
Trueif the ReliableMessageListener should terminate itself,Falseif it should keep on running.
- class ReliableTopic(service_name, name, context, ringbuffer)[source]
Bases:
Proxy,Generic[MessageType]Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subscribers, which is also known as a publish/subscribe (pub/sub) messaging model. Publish and subscriptions are cluster-wide. When a member subscribes for a topic, it is actually registering for messages published by any member in the cluster, including the new members joined after you added the listener.
Messages are ordered, meaning that listeners(subscribers) will process the messages in the order they are actually published.
Hazelcast’s Reliable Topic uses the same Topic interface as a regular topic. The main difference is that Reliable Topic is backed up by the Ringbuffer data structure, a replicated but not partitioned data structure that stores its data in a ring-like structure.
- async publish(message: MessageType) None[source]
Publishes the message to all subscribers of this topic.
- Parameters:
message – The message.
- async publish_all(messages: Sequence[MessageType]) None[source]
Publishes all messages to all subscribers of this topic.
- Parameters:
messages – Messages to publish.
- async add_listener(listener: ReliableMessageListener | Callable[[TopicMessage[MessageType]], None]) str[source]
Subscribes to this reliable topic.
It can be either a simple function or an instance of an
ReliableMessageListener. When a function is passed, aReliableMessageListeneris created out of that with sensible default values.When a message is published, the
ReliableMessageListener.on_message()method of the given listener (or the function passed) is called.More than one message listener can be added on one instance.
- Parameters:
listener – Listener to add.
- Returns:
The registration id.
- class MultiMap(service_name, name, context)[source]
Bases:
Proxy,Generic[KeyType,ValueType]A specialized map whose keys can be associated with multiple values.
Example
>>> my_map = await client.get_multi_map("my_map") >>> print("put", await my_map.put("key", "value1")) >>> print("get", await my_map.get("key"))
Warning
Asyncio client multi map proxy is not thread-safe, do not access it from other threads.
- async add_entry_listener(include_value: bool = False, key: KeyType = None, added_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, removed_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, clear_all_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None) str[source]
Adds an entry listener for this multimap.
The listener will be notified for all multimap add/remove/clear-all events.
- Parameters:
include_value – Whether received event should include the value or not.
key – Key for filtering the events.
added_func – Function to be called when an entry is added to map.
removed_func – Function to be called when an entry is removed from map.
clear_all_func – Function to be called when entries are cleared from map.
- Returns:
A registration id which is used as a key to remove the listener.
- async contains_key(key: KeyType) bool[source]
Determines whether this multimap contains an entry with the key.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – The specified key.
- Returns:
Trueif this multimap contains an entry for the specified key,Falseotherwise.
- async contains_value(value: ValueType) bool[source]
Determines whether this map contains one or more keys for the specified value.
- Parameters:
value – The specified value.
- Returns:
Trueif this multimap contains an entry for the specified value,Falseotherwise.
- async contains_entry(key: KeyType, value: ValueType) bool[source]
Returns whether the multimap contains an entry with the value.
- Parameters:
key – The specified key.
value – The specified value.
- Returns:
Trueif this multimap contains the key-value tuple,Falseotherwise.
- async entry_set() List[Tuple[KeyType, ValueType]][source]
Returns the list of key-value tuples in the multimap.
Warning
The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.
- Returns:
The list of key-value tuples in the multimap.
- async get(key: KeyType) List[ValueType] | None[source]
Returns the list of values associated with the key.
Noneif this map does not contain this key.Warning
This method uses
__hash__and__eq__of the binary form of the key, not the actual implementations of__hash__and__eq__defined in the key’s class.Warning
The list is NOT backed by the multimap, so changes to the map are not reflected in the collection, and vice-versa.
- Parameters:
key – The specified key.
- Returns:
The list of the values associated with the specified key.
- async key_set() List[KeyType][source]
Returns the list of keys in the multimap.
Warning
The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.
- Returns:
A list of the clone of the keys.
- async remove(key: KeyType, value: ValueType) bool[source]
Removes the given key-value tuple from the multimap.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – The key of the entry to remove.
value – The value of the entry to remove.
- Returns:
Trueif the size of the multimap changed after the remove operation,Falseotherwise.
- async remove_all(key: KeyType) List[ValueType][source]
Removes all the entries with the given key and returns the value list associated with this key.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.Warning
The returned list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.
- Parameters:
key – The key of the entries to remove.
- Returns:
The collection of removed values associated with the given key.
- async put(key: KeyType, value: ValueType) bool[source]
Stores a key-value tuple in the multimap.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – The key to be stored.
value – The value to be stored.
- Returns:
Trueif size of the multimap is increased,Falseif the multimap already contains the key-value tuple.
- async put_all(multimap: Dict[KeyType, Sequence[ValueType]]) None[source]
Stores the given Map in the MultiMap.
The results of concurrently mutating the given map are undefined. No atomicity guarantees are given. It could be that in case of failure some of the key/value-pairs get written, while others are not.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
multimap – the map corresponds to multimap entries.
- async remove_entry_listener(registration_id: str) bool[source]
Removes the specified entry listener.
Returns silently if there is no such listener added before.
- Parameters:
registration_id – Id of registered listener.
- Returns:
Trueif registration is removed,Falseotherwise.
- async size() int[source]
Returns the number of entries in this multimap.
- Returns:
Number of entries in this multimap.
- async value_count(key: KeyType) int[source]
Returns the number of values that match the given key in the multimap.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – The key whose values count is to be returned.
- Returns:
The number of values that match the given key in the multimap.
- class PNCounter(service_name, name, context)[source]
Bases:
ProxyPN (Positive-Negative) CRDT counter.
The counter supports adding and subtracting values as well as retrieving the current counter value. Each replica of this counter can perform operations locally without coordination with the other replicas, thus increasing availability. The counter guarantees that whenever two nodes have received the same set of updates, possibly in a different order, their state is identical, and any conflicting updates are merged automatically. If no new updates are made to the shared state, all nodes that can communicate will eventually have the same data.
When invoking updates from the client, the invocation is remote. This may lead to indeterminate state - the update may be applied but the response has not been received. In this case, the caller will be notified with a TargetDisconnectedError.
The read and write methods provide monotonic read and RYW (read-your-write) guarantees. These guarantees are session guarantees which means that if no replica with the previously observed state is reachable, the session guarantees are lost and the method invocation will throw a ConsistencyLostError. This does not mean that an update is lost. All of the updates are part of some replica and will be eventually reflected in the state of all other replicas. This exception just means that you cannot observe your own writes because all replicas that contain your updates are currently unreachable. After you have received a ConsistencyLostError, you can either wait for a sufficiently up-to-date replica to become reachable in which case the session can be continued or you can reset the session by calling the reset() method. If you have called the reset() method, a new session is started with the next invocation to a CRDT replica.
Notes
The CRDT state is kept entirely on non-lite (data) members. If there aren’t any and the methods here are invoked on a lite member, they will fail with an NoDataMemberInClusterError.
- async get() int[source]
Returns the current value of the counter.
- Returns:
The current value of the counter.
- Raises:
NoDataMemberInClusterError – if the cluster does not contain any data members.
ConsistencyLostError – if the session guarantees have been lost.
- async get_and_add(delta: int) int[source]
Adds the given value to the current value and returns the previous value.
- Parameters:
delta – The value to add.
- Returns:
The previous value.
- Raises:
NoDataMemberInClusterError – if the cluster does not contain any data members.
ConsistencyLostError – if the session guarantees have been lost.
- async add_and_get(delta: int) int[source]
Adds the given value to the current value and returns the updated value.
- Parameters:
delta – The value to add.
- Returns:
The updated value.
- Raises:
NoDataMemberInClusterError – if the cluster does not contain any data members.
ConsistencyLostError – if the session guarantees have been lost.
- async get_and_subtract(delta: int) int[source]
Subtracts the given value from the current value and returns the previous value.
- Parameters:
delta – The value to subtract.
- Returns:
The previous value.
- Raises:
NoDataMemberInClusterError – if the cluster does not contain any data members.
ConsistencyLostError – if the session guarantees have been lost.
- async subtract_and_get(delta: int) int[source]
Subtracts the given value from the current value and returns the updated value.
- Parameters:
delta – The value to subtract.
- Returns:
The updated value.
- Raises:
NoDataMemberInClusterError – if the cluster does not contain any data members.
ConsistencyLostError – if the session guarantees have been lost.
- async get_and_decrement() int[source]
Decrements the counter value by one and returns the previous value.
- Returns:
The previous value.
- Raises:
NoDataMemberInClusterError – if the cluster does not contain any data members.
ConsistencyLostError – if the session guarantees have been lost.
- async decrement_and_get() int[source]
Decrements the counter value by one and returns the updated value.
- Returns:
The updated value.
- Raises:
NoDataMemberInClusterError – if the cluster does not contain any data members.
ConsistencyLostError – if the session guarantees have been lost.
- async get_and_increment() int[source]
Increments the counter value by one and returns the previous value.
- Returns:
The previous value.
- Raises:
NoDataMemberInClusterError – if the cluster does not contain any data members.
ConsistencyLostError – if the session guarantees have been lost.
- async increment_and_get() int[source]
Increments the counter value by one and returns the updated value.
- Returns:
The updated value.
- Raises:
NoDataMemberInClusterError – if the cluster does not contain any data members.
ConsistencyLostError – if the session guarantees have been lost.
- class Queue(service_name, name, context)[source]
Bases:
PartitionSpecificProxy,Generic[ItemType]Concurrent, blocking, distributed, observable queue.
Queue is not a partitioned data-structure. All of the Queue content is stored in a single machine (and in the backup). Queue will not scale by adding more members in the cluster.
Example
>>> my_queue = await client.get_queue("my_queue") >>> print("queue.offer", await my_queue.offer("item")) >>> print("queue.size", await my_queue.size())
Warning
Asyncio client queue proxy is not thread-safe, do not access it from other threads.
- async add(item: ItemType) bool[source]
Adds the specified item to this queue if there is available space.
- Parameters:
item – The specified item.
- Returns:
Trueif element is successfully added,Falseotherwise.- Raises:
IllegalStateError – If queue is full.
- async add_all(items: Sequence[ItemType]) bool[source]
Adds the elements in the specified collection to this queue.
- Parameters:
items – Collection which includes the items to be added.
- Returns:
Trueif this queue is changed after call,Falseotherwise.
- async add_listener(include_value: bool = False, item_added_func: Callable[[ItemEvent[ItemType]], None] = None, item_removed_func: Callable[[ItemEvent[ItemType]], None] = None) str[source]
Adds an item listener for this queue. Listener will be notified for all queue add/remove events.
- Parameters:
include_value – Whether received events include the updated item or not.
item_added_func – Function to be called when an item is added to this queue.
item_removed_func – Function to be called when an item is deleted from this queue.
- Returns:
A registration id which is used as a key to remove the listener.
- async contains(item: ItemType) bool[source]
Determines whether this queue contains the specified item or not.
- Parameters:
item – The specified item to be searched.
- Returns:
Trueif the specified item exists in this queue,Falseotherwise.
- async contains_all(items: Sequence[ItemType]) bool[source]
Determines whether this queue contains all of the items in the specified collection or not.
- Parameters:
items – The specified collection which includes the items to be searched.
- Returns:
Trueif all of the items in the specified collection exist in this queue,Falseotherwise.
- async drain_to(target_list: List[ItemType], max_size: int = -1) int[source]
Transfers all available items to the given target_list and removes these items from this queue.
If a max_size is specified, it transfers at most the given number of items. In case of a failure, an item can exist in both collections or none of them.
This operation may be more efficient than polling elements repeatedly and putting into collection.
- Parameters:
target_list – the list where the items in this queue will be transferred.
max_size – The maximum number items to transfer.
- Returns:
Number of transferred items.
- async iterator() List[ItemType][source]
Returns all the items in this queue.
- Returns:
Collection of items in this queue.
- async is_empty() bool[source]
Determines whether this queue is empty or not.
- Returns:
Trueif this queue is empty,Falseotherwise.
- async offer(item: ItemType, timeout: float = 0) bool[source]
Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions.
If there is no space currently available:
If the timeout is provided, it waits until this timeout elapses and returns the result.
If the timeout is not provided, returns
Falseimmediately.
- Parameters:
item – The item to be added.
timeout – Maximum time in seconds to wait for addition.
- Returns:
Trueif the element was added to this queue,Falseotherwise.
- async peek() ItemType | None[source]
Retrieves the head of queue without removing it from the queue.
- Returns:
The head of this queue, or
Noneif this queue is empty.
- async poll(timeout: float = 0) ItemType | None[source]
Retrieves and removes the head of this queue.
If this queue is empty:
If the timeout is provided, it waits until this timeout elapses and returns the result.
If the timeout is not provided, returns
None.
- Parameters:
timeout – Maximum time in seconds to wait for addition.
- Returns:
The head of this queue, or
Noneif this queue is empty or specified timeout elapses before an item is added to the queue.
- async put(item: ItemType) None[source]
Adds the specified element into this queue.
If there is no space, it waits until necessary space becomes available.
- Parameters:
item – The specified item.
- async remaining_capacity() int[source]
Returns the remaining capacity of this queue.
- Returns:
Remaining capacity of this queue.
- async remove(item: ItemType) bool[source]
Removes the specified element from the queue if it exists.
- Parameters:
item – The specified element to be removed.
- Returns:
Trueif the specified element exists in this queue,Falseotherwise.
- async remove_all(items: Sequence[ItemType]) bool[source]
Removes all of the elements of the specified collection from this queue.
- Parameters:
items – The specified collection.
- Returns:
Trueif the call changed this queue,Falseotherwise.
- async remove_listener(registration_id: str) bool[source]
Removes the specified item listener.
Returns silently if the specified listener was not added before.
- Parameters:
registration_id – Id of the listener to be deleted.
- Returns:
Trueif the item listener is removed,Falseotherwise.
- async retain_all(items: Sequence[ItemType]) bool[source]
Removes the items which are not contained in the specified collection.
In other words, only the items that are contained in the specified collection will be retained.
- Parameters:
items – Collection which includes the elements to be retained in this queue.
- Returns:
Trueif this queue changed as a result of the call,Falseotherwise.
- class ReplicatedMap(service_name, name, context)[source]
Bases:
Proxy,Generic[KeyType,ValueType]A ReplicatedMap is a map-like data structure with weak consistency and values locally stored on every node of the cluster.
Whenever a value is written asynchronously, the new value will be internally distributed to all existing cluster members, and eventually every node will have the new value.
When a new node joins the cluster, the new node initially will request existing values from older nodes and replicate them locally.
Example
>>> my_map = await client.get_replicated_map("my_map") >>> print("put", await my_map.put("key", "value")) >>> print("get", await my_map.get("key"))
- async add_entry_listener(key: KeyType = None, predicate: Predicate = None, added_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, removed_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, updated_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, evicted_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None, clear_all_func: Callable[[EntryEvent[KeyType, ValueType]], None] = None) str[source]
Adds a continuous entry listener for this map.
Listener will get notified for map events filtered with given parameters.
- Parameters:
key – Key for filtering the events.
predicate – Predicate for filtering the events.
added_func – Function to be called when an entry is added to map.
removed_func – Function to be called when an entry is removed from map.
updated_func – Function to be called when an entry is updated.
evicted_func – Function to be called when an entry is evicted from map.
clear_all_func – Function to be called when entries are cleared from map.
- Returns:
A registration id which is used as a key to remove the listener.
- async contains_key(key: KeyType) bool[source]
Determines whether this map contains an entry with the key.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – The specified key.
- Returns:
Trueif this map contains an entry for the specified key,Falseotherwise.
- async contains_value(value: ValueType) bool[source]
Determines whether this map contains one or more keys for the specified value.
- Parameters:
value – The specified value.
- Returns:
Trueif this map contains an entry for the specified value,Falseotherwise.
- async entry_set() List[Tuple[KeyType, ValueType]][source]
Returns a List clone of the mappings contained in this map.
Warning
The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.
- Returns:
The list of key-value tuples in the map.
- async get(key: KeyType) ValueType | None[source]
Returns the value for the specified key, or
Noneif this map does not contain this key.Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – The specified key.
- Returns:
The value associated with the specified key.
- async is_empty() bool[source]
Returns
Trueif this map contains no key-value mappings.- Returns:
Trueif this map contains no key-value mappings.
- async key_set() List[KeyType][source]
Returns the list of keys in the ReplicatedMap.
Warning
The list is NOT backed by the map, so changes to the map are NOT reflected in the list, and vice-versa.
- Returns:
A list of the clone of the keys.
- async put(key: KeyType, value: ValueType, ttl: float = 0) ValueType | None[source]
Associates the specified value with the specified key in this map.
If the map previously contained a mapping for the key, the old value is replaced by the specified value. If ttl is provided, entry will expire and get evicted after the ttl.
- Parameters:
key – The specified key.
value – The value to associate with the key.
ttl – Maximum time in seconds for this entry to stay, if not provided, the value configured on server side configuration will be used.
- Returns:
Previous value associated with key or
Noneif there was no mapping for key.
- async put_all(source: Dict[KeyType, ValueType]) None[source]
Copies all the mappings from the specified map to this map.
No atomicity guarantees are given. In the case of a failure, some key-value tuples may get written, while others are not.
- Parameters:
source – Map which includes mappings to be stored in this map.
- async remove(key: KeyType) ValueType | None[source]
Removes the mapping for a key from this map if it is present.
The map will not contain a mapping for the specified key once the call returns.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – Key of the mapping to be deleted.
- Returns:
The previous value associated with key, or
Noneif there was no mapping for key.
- async remove_entry_listener(registration_id: str) bool[source]
Removes the specified entry listener.
Returns silently if there is no such listener added before.
- Parameters:
registration_id – Id of registered listener.
- Returns:
Trueif registration is removed,Falseotherwise.
- class Ringbuffer(service_name, name, context)[source]
Bases:
PartitionSpecificProxy,Generic[ItemType]A Ringbuffer is an append-only data-structure where the content is stored in a ring like structure.
A ringbuffer has a capacity so it won’t grow beyond that capacity and endanger the stability of the system. If that capacity is exceeded, then the oldest item in the ringbuffer is overwritten. The ringbuffer has two always incrementing sequences:
tail_sequence(): This is the side where the youngest item is found. So the tail is the side of the ringbuffer where items are added to.head_sequence(): This is the side where the oldest items are found. So the head is the side where items gets discarded.
The items in the ringbuffer can be found by a sequence that is in between (inclusive) the head and tail sequence.
If data is read from a ringbuffer with a sequence that is smaller than the head sequence, it means that the data is not available anymore and a
hazelcast.errors.StaleSequenceErroris thrown.A Ringbuffer currently is a replicated, but not partitioned data structure. So all data is stored in a single partition, similarly to the
hazelcast.internal.asyncio_proxy.queue.Queueimplementation.A Ringbuffer can be used in a way similar to the Queue, but one of the key differences is that a
hazelcast.internal.asyncio_proxy.queue.Queue.take()is destructive, meaning that only 1 consumer is able to take an item. Aread_one()is not destructive, so you can have multiple consumers reading the same item multiple times.Example
>>> rb = await client.get_ringbuffer("my_ringbuffer") >>> await rb.add("item") >>> print("read_one", await rb.read_one(0))
- async capacity() int[source]
Returns the capacity of this Ringbuffer.
- Returns:
The capacity of Ringbuffer.
- async size() int[source]
Returns number of items in the Ringbuffer.
- Returns:
The size of Ringbuffer.
- async tail_sequence() int[source]
Returns the sequence of the tail.
The tail is the side of the Ringbuffer where the items are added to. The initial value of the tail is
-1.- Returns:
The sequence of the tail.
- async head_sequence() int[source]
Returns the sequence of the head.
The head is the side of the Ringbuffer where the oldest items in the Ringbuffer are found. If the Ringbuffer is empty, the head will be one more than the tail. The initial value of the head is
0(1more than tail).- Returns:
The sequence of the head.
- async remaining_capacity() int[source]
Returns the remaining capacity of the Ringbuffer.
- Returns:
The remaining capacity of Ringbuffer.
- async add(item, overflow_policy: int = 0) int[source]
Adds the specified item to the tail of the Ringbuffer.
If there is no space in the Ringbuffer, the action is determined by
overflow_policy.- Parameters:
item – The specified item to be added.
overflow_policy – the OverflowPolicy to be used when there is no space.
- Returns:
The sequenceId of the added item, or
-1if the add failed.
- async add_all(items: Sequence[ItemType], overflow_policy: int = 0) int[source]
Adds all items in the specified collection to the tail of the Ringbuffer.
This is likely to outperform multiple calls to
add()due to better io utilization and a reduced number of executed operations. The items are added in the order of the Iterator of the collection.If there is no space in the Ringbuffer, the action is determined by
overflow_policy.- Parameters:
items – The specified collection which contains the items to be added.
overflow_policy – The OverflowPolicy to be used when there is no space.
- Returns:
The sequenceId of the last written item, or
-1of the last write is failed.
- async read_one(sequence: int) ItemType[source]
Reads one item from the Ringbuffer.
If the sequence is one beyond the current tail, this call blocks until an item is added. Currently, it isn’t possible to control how long this call is going to block.
- Parameters:
sequence – The sequence of the item to read.
- Returns:
The read item.
- async read_many(start_sequence: int, min_count: int, max_count: int, filter: Any = None) ReadResult[source]
Reads a batch of items from the Ringbuffer.
If the number of available items after the first read item is smaller than
max_count, these items are returned. So, number of items read may be smaller thanmax_count. If there are fewer items available thanmin_count, then this call blocks.Warning
These blocking calls consume server memory and if there are many calls, an
OutOfMemoryErrormay be thrown on server-side.Reading a batch of items is likely to perform better because less overhead is involved.
A filter can be provided to select items that need to be read. If the filter is
None, all items are read. If the filter is notNone, items where the filter function returns true are returned. Using filters is a good way to prevent getting items that are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant performance improvement. Note that, filtering logic must be defined on the server-side.If the
start_sequenceis smaller than the smallest sequence still available in the Ringbuffer (head_sequence()), then the smallest available sequence will be used as the start sequence and the minimum/maximum number of items will be attempted to be read from there on.If the
start_sequenceis bigger than the last available sequence in the Ringbuffer (tail_sequence()), then the last available sequence plus one will be used as the start sequence and the call will block until further items become available and it can read at least the minimum number of items.- Parameters:
start_sequence – The start sequence of the first item to read.
min_count – The minimum number of items to read.
max_count – The maximum number of items to read.
filter – Filter to select returned elements.
- Returns:
The list of read items.
- class Semaphore(context, group_id, service_name, proxy_name, object_name)[source]
Bases:
BaseCPProxyA linearizable, distributed semaphore.
Semaphores are often used to restrict the number of callers that can access some physical or logical resource.
Semaphore is a cluster-wide counting semaphore. Conceptually, it maintains a set of permits. Each
acquire()blocks if necessary until a permit is available, and then takes it. Dually, eachrelease()adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the semaphore just keeps a count of the number available and acts accordingly.Hazelcast’s distributed semaphore implementation guarantees that callers invoking any of the
acquire()methods are selected to obtain permits in the order of their invocations (first-in-first-out; FIFO). Note that FIFO ordering implies the order which the primary replica of an Semaphore receives these acquire requests. Therefore, it is possible for one member to invokeacquire()before another member, but its request hits the primary replica after the other member.This class also provides convenient ways to work with multiple permits at once. Beware of the increased risk of indefinite postponement when using the multiple-permit acquire. If permits are released one by one, a caller waiting for one permit will acquire it before a caller waiting for multiple permits regardless of the call order.
Correct usage of a semaphore is established by programming convention in the application.
It works on top of the Raft consensus algorithm. It offers linearizability during crash failures and network partitions. It is CP with respect to the CAP principle. If a network partition occurs, it remains available on at most one side of the partition.
It has 2 variations:
The default implementation accessed via
cp_subsystemis session-aware. In this one, when a caller makes its very firstacquire()call, it starts a new CP session with the underlying CP group. Then, liveliness of the caller is tracked via this CP session. When the caller fails, permits acquired by this caller are automatically and safely released. However, the session-aware version comes with a limitation, that is, a client cannot release permits before acquiring them first. In other words, a client can release only the permits it has acquired earlier. It means, you can acquire a permit from one thread and release it from another thread using the same Hazelcast client, but not different instances of Hazelcast client. You can use the session-aware CP Semaphore implementation by disabling JDK compatibility viajdk-compatibleserver-side setting. Although the session-aware implementation has a minor difference to the JDK Semaphore, we think it is a better fit for distributed environments because of its safe auto-cleanup mechanism for acquired permits.The second implementation offered by
cp_subsystemis sessionless. This implementation does not perform auto-cleanup of acquired permits on failures. Acquired permits are not bound to threads and permits can be released without acquiring first. However, you need to handle failed permit owners on your own. If a Hazelcast server or a client fails while holding some permits, they will not be automatically released. You can use the sessionless CP Semaphore implementation by enabling JDK compatibility viajdk-compatibleserver-side setting.
There is a subtle difference between the lock and semaphore abstractions. A lock can be assigned to at most one endpoint at a time, so we have a total order among its holders. However, permits of a semaphore can be assigned to multiple endpoints at a time, which implies that we may not have a total order among permit holders. In fact, permit holders are partially ordered. For this reason, the fencing token approach, which is explained in
FencedLock, does not work for the semaphore abstraction. Moreover, each permit is an independent entity. Multiple permit acquires and reentrant lock acquires of a single endpoint are not equivalent. The only case where a semaphore behaves like a lock is the binary case, where the semaphore has only 1 permit. In this case, the semaphore works like a non-reentrant lock.All of the API methods in the new CP Semaphore implementation offer the exactly-once execution semantics for the session-aware version. For instance, even if a
release()call is internally retried because of a crashed Hazelcast member, the permit is released only once. However, this guarantee is not given for the sessionless, a.k.a, JDK-compatible CP Semaphore.- async init(permits: int) bool[source]
Tries to initialize this Semaphore instance with the given permit count.
- Parameters:
permits – The given permit count.
- Returns:
Trueif the initialization succeeds,Falseif already initialized.- Raises:
AssertionError – If the
permitsis negative.
- async acquire(permits: int = 1) None[source]
Acquires the given number of permits if they are available, and returns immediately, reducing the number of available permits by the given amount.
If insufficient permits are available then the result of the returned future is not set until one of the following things happens:
Some other caller invokes one of the
releasemethods for this semaphore, the current caller is next to be assigned permits and the number of available permits satisfies this request,This Semaphore instance is destroyed
- Parameters:
permits – Optional number of permits to acquire; defaults to
1when not specified- Raises:
AssertionError – If the
permitsis not positive.
- async available_permits() int[source]
Returns the current number of permits currently available in this semaphore.
This method is typically used for debugging and testing purposes.
- Returns:
The number of permits available in this semaphore.
- async drain_permits() int[source]
Acquires and returns all permits that are available at invocation time.
- Returns:
The number of permits drained.
- async reduce_permits(reduction: int) None[source]
Reduces the number of available permits by the indicated amount.
This method differs from
acquireas it does not block until permits become available. Similarly, if the caller has acquired some permits, they are not released with this call.- Parameters:
reduction – The number of permits to reduce.
- Raises:
AssertionError – If the
reductionis negative.
- async increase_permits(increase: int) None[source]
Increases the number of available permits by the indicated amount.
If there are some callers waiting for permits to become available, they will be notified. Moreover, if the caller has acquired some permits, they are not released with this call.
- Parameters:
increase – The number of permits to increase.
- Raises:
AssertionError – If
increaseis negative.
- async release(permits: int = 1) None[source]
Releases the given number of permits and increases the number of available permits by that amount.
If some callers in the cluster are blocked for acquiring permits, they will be notified.
If the underlying Semaphore implementation is non-JDK-compatible (configured via
jdk-compatibleserver-side setting), then a client can only release a permit which it has acquired before. In other words, a client cannot release a permit without acquiring it first.Otherwise, which means the underlying implementation is JDK compatible (configured via
jdk-compatibleserver-side setting), there is no requirement that a client that releases a permit must have acquired that permit by calling one of theacquire()methods. A client can freely release a permit without acquiring it first. In this case, correct usage of a semaphore is established by programming convention in the application.- Parameters:
permits – Optional number of permits to release; defaults to
1when not specified.- Raises:
AssertionError – If the
permitsis not positive.IllegalStateError – if the Semaphore is non-JDK-compatible and the caller does not have a permit
- async try_acquire(permits: int = 1, timeout: float = 0) bool[source]
Acquires the given number of permits and returns
True, if they become available during the given waiting time.If permits are acquired, the number of available permits in the Semaphore instance is also reduced by the given amount.
If no sufficient permits are available, then the result of the returned future is not set until one of the following things happens:
Permits are released by other callers, the current caller is next to be assigned permits and the number of available permits satisfies this request
The specified waiting time elapses
- Parameters:
permits – The number of permits to acquire; defaults to
1when not specified.timeout – Optional timeout in seconds to wait for the permits; when it’s not specified the operation will return immediately after the acquire attempt.
- Returns:
Trueif all permits were acquired,Falseif the waiting time elapsed before all permits could be acquired- Raises:
AssertionError – If the
permitsis not positive.
- class Set(service_name, name, context)[source]
Bases:
PartitionSpecificProxy,Generic[ItemType]Concurrent, distributed implementation of Set.
Example
>>> my_set = await client.get_set("my_set") >>> print("set.add", await my_set.add("item")) >>> print("set.size", await my_set.size())
Warning
Asyncio client set proxy is not thread-safe, do not access it from other threads.
- async add(item: ItemType) bool[source]
Adds the specified item if it is not exists in this set.
- Parameters:
item – The specified item to be added.
- Returns:
Trueif this set is changed after call,Falseotherwise.
- async add_all(items: Sequence[ItemType]) bool[source]
Adds the elements in the specified collection if they’re not exist in this set.
- Parameters:
items – Collection which includes the items to be added.
- Returns:
Trueif this set is changed after call,Falseotherwise.
- async add_listener(include_value: bool = False, item_added_func: Callable[[ItemEvent[ItemType]], None] = None, item_removed_func: Callable[[ItemEvent[ItemType]], None] = None) str[source]
Adds an item listener for this container.
Listener will be notified for all container add/remove events.
- Parameters:
include_value – Whether received events include the updated item or not.
item_added_func – Function to be called when an item is added to this set.
item_removed_func – Function to be called when an item is deleted from this set.
- Returns:
A registration id which is used as a key to remove the listener.
- async contains(item: ItemType) bool[source]
Determines whether this set contains the specified item or not.
- Parameters:
item – The specified item to be searched.
- Returns:
Trueif the specified item exists in this set,Falseotherwise.
- async contains_all(items: Sequence[ItemType]) bool[source]
Determines whether this set contains all items in the specified collection or not.
- Parameters:
items – The specified collection which includes the items to be searched.
- Returns:
Trueif all the items in the specified collection exist in this set,Falseotherwise.
- async get_all() List[ItemType][source]
Returns all the items in the set.
- Returns:
List of the items in this set.
- async is_empty() bool[source]
Determines whether this set is empty or not.
- Returns:
Trueif this set is empty,Falseotherwise.
- async remove(item: ItemType) bool[source]
Removes the specified element from the set if it exists.
- Parameters:
item – The specified element to be removed.
- Returns:
Trueif the specified element exists in this set,Falseotherwise.
- async remove_all(items: Sequence[ItemType]) bool[source]
Removes all of the elements of the specified collection from this set.
- Parameters:
items – The specified collection.
- Returns:
Trueif the call changed this set,Falseotherwise.
- async remove_listener(registration_id: str) bool[source]
Removes the specified item listener.
Returns silently if the specified listener was not added before.
- Parameters:
registration_id – Id of the listener to be deleted.
- Returns:
Trueif the item listener is removed,Falseotherwise.
- async retain_all(items: Sequence[ItemType]) bool[source]
Removes the items which are not contained in the specified collection.
In other words, only the items that are contained in the specified collection will be retained.
- Parameters:
items – Collection which includes the elements to be retained in this set.
- Returns:
Trueif this set changed as a result of the call,Falseotherwise.
- class VectorCollection(service_name, name, context)[source]
Bases:
Proxy,Generic[KeyType,ValueType]VectorCollection contains documents with vectors.
Concurrent, distributed, observable and searchable vector collection.
The configuration of the vector collection must exist before it can be used.
Example
>>> await client.create_vector_collection_config("my_vc", [ >>> IndexConfig(name="default-vector", metric=Metric.COSINE, dimension=2) >>> ] >>> my_vc = await client.get_vector_collection("my_vc") >>> await my_vc.set("key1", Vector("default-vector", Type.DENSE, [0.1, 0.2])
Warning
Asyncio client vector collection proxy is not thread-safe, do not access it from other threads.
Warning
Asyncio client is BETA. Its public API may change until General Availability release.
- async get(key: Any) Document | None[source]
Returns the Document for the specified key, or
Noneif this VectorCollection does not contain this key.Warning
This method returns a clone of the original Document. Modifying the returned Document does not change the actual Document in the VectorCollection. Put the modified Document back to make changes visible to all nodes.
>>> doc = await my_vc.get(key) >>> doc.value.update_some_property() >>> await my_vc.set(key, doc)
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in the key’s class.- Parameters:
key – The specified key.
- Returns:
The Document for the specified key or
Noneif there was no mapping for key.
- async set(key: Any, document: Document) None[source]
Sets a document for the given key in the VectorCollection.
Similar to the put operation except that set doesn’t return the old document, which is more efficient.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – Key of the entry.
document – Document of the entry.
- async put(key: Any, document: Document) Document | None[source]
Associates the specified Document with the specified key in this VectorCollection.
If the VectorCollection previously contained a mapping for the key, the old Document is replaced by the specified Document. If the previous value is not needed, using the
setmethod is more efficient.Warning
This method returns a clone of the previous Document, not the original (identically equal) Document previously put into the VectorCollection.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in the key’s class.- Parameters:
key – Key of the entry.
document – Document of the entry.
- Returns:
Previous Document associated with the key or
Noneif there was no mapping for the key.
- async put_all(map: Dict[Any, Document]) None[source]
Copies all the mappings from the specified dictionary to this VectorCollection.
No atomicity guarantees are given. In the case of a failure, some key-document tuples may get written, while others are not.
- Parameters:
map – Dictionary which includes mappings to be stored in this VectorCollection.
- async put_if_absent(key: Any, document: Document) Document | None[source]
Associates the specified key with the given Document if it is not already associated.
Warning
This method returns a clone of the previous Document, not the original (identically equal) Document previously put into the VectorCollection.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in key’s class.- Parameters:
key – Key of the entry.
document – Document of the entry.
- Returns:
Old Document for the given key or
Noneif there is not one.
- async search_near_vector(vector: Vector, *, include_value: bool = False, include_vectors: bool = False, limit: int = 10, hints: Dict[str, str] = None) List[SearchResult][source]
Returns the Documents closest to the given vector.
The search is performed using the distance metric set when creating the vector index.
- Parameters:
vector – The vector to be used as the reference. It must have the same dimension as specified when creating the vector index.
include_value – Return value attached to the Document.
include_vectors – Return vectors attached to the Document.
limit – Limit the maximum number of Documents returned. If not set,
10is used as the default limit.
- Returns:
List of search results.
- async remove(key: Any) Document | None[source]
Removes the mapping for a key from this VectorCollection if it is present (optional operation).
The VectorCollection will not contain a mapping for the specified key once the call returns.
Warning
This method uses
__hash__and__eq__methods of binary form of the key, not the actual implementations of__hash__and__eq__defined in the key’s class.- Parameters:
key – Key of the mapping to be deleted.
- Returns:
The Document associated with key, or
Noneif there was no mapping for key.
- async delete(key: Any) None[source]
Removes the mapping for a key from this VectorCollection if it is present (optional operation).
Unlike remove(object), this operation does not return the removed Document, which avoids the serialization cost of the returned Document. If the removed Document will not be used, a delete operation is preferred over a remove operation for better performance.
The VectorCollection will not contain a mapping for the specified key once the call returns.
- Parameters:
key – Key of the mapping to be deleted.
- async optimize(index_name: str = None) None[source]
Optimize index by fully removing nodes marked for deletion, trimming neighbor sets to the advertised degree, and updating the entry node as necessary.
Warning
This operation can take a long time to execute and consume a lot of server resources.
- Parameters:
index_name – Name of the index to optimize. If not specified, the only index defined for the collection will be used. Must be specified if the collection has more than one index.