ReliableTopic

class ReliableMessageListener(*args, **kwds)[source]

Bases: Generic[MessageType]

A message listener for ReliableTopic.

A message listener will not be called concurrently (provided that it’s not registered twice). So there is no need to synchronize access to the state it reads or writes.

If a regular function is registered on a reliable topic, the message listener works fine, but it can’t do much more than listen to messages.

This is an enhanced version of that to better integrate with the reliable topic.

Durable Subscription

The ReliableMessageListener allows you to control where you want to start processing a message when the listener is registered. This makes it possible to create a durable subscription by storing the sequence of the last message and using this as the sequence id to start from.

Error handling

The ReliableMessageListener also gives the ability to deal with errors using the is_terminal() method. If a plain function is used, then it won’t terminate on errors and it will keep on running. But in some cases it is better to stop running.

Global order

The ReliableMessageListener will always get all events in order (global order). It will not get duplicates and there will only be gaps if it is too slow. For more information see is_loss_tolerant().

Delivery guarantees

Because the ReliableMessageListener controls which item it wants to continue from upon restart, it is very easy to provide an at-least-once or at-most-once delivery guarantee. The store_sequence() is always called before a message is processed; so it can be persisted on some non-volatile storage. When the retrieve_initial_sequence() returns the stored sequence, then an at-least-once delivery is implemented since the same item is now being processed twice. To implement an at-most-once delivery guarantee, add 1 to the stored sequence when the retrieve_initial_sequence() is called.

on_message(message: TopicMessage[MessageType]) None[source]

Invoked when a message is received for the added reliable topic.

One should not block in this callback. If blocking is necessary, consider delegating that task to an executor or a thread pool.

Parameters

message – The message that is received for the topic

retrieve_initial_sequence() int[source]

Retrieves the initial sequence from which this ReliableMessageListener should start.

Return -1 if there is no initial sequence and you want to start from the next published message.

If you intend to create a durable subscriber so you continue from where you stopped the previous time, load the previous sequence and add 1. If you don’t add one, then you will be receiving the same message twice.

Returns

The initial sequence.

store_sequence(sequence: int) None[source]

Informs the ReliableMessageListener that it should store the sequence. This method is called before the message is processed. Can be used to make a durable subscription.

Parameters

sequence – The sequence.

is_loss_tolerant() bool[source]

Checks if this ReliableMessageListener is able to deal with message loss. Even though the reliable topic promises to be reliable, it can be that a ReliableMessageListener is too slow. Eventually the message won’t be available anymore.

If the ReliableMessageListener is not loss tolerant and the topic detects that there are missing messages, it will terminate the ReliableMessageListener.

Returns

True if the ReliableMessageListener is tolerant towards losing messages.

is_terminal(error: Exception) bool[source]

Checks if the ReliableMessageListener should be terminated based on an error raised while calling on_message().

Parameters

error – The error raised while calling on_message()

Returns

True if the ReliableMessageListener should terminate itself, False if it should keep on running.

class ReliableTopic(service_name, name, context)[source]

Bases: Proxy[BlockingReliableTopic], Generic[MessageType]

Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subscribers, which is also known as a publish/subscribe (pub/sub) messaging model. Publish and subscriptions are cluster-wide. When a member subscribes for a topic, it is actually registering for messages published by any member in the cluster, including the new members joined after you added the listener.

Messages are ordered, meaning that listeners(subscribers) will process the messages in the order they are actually published.

Hazelcast’s Reliable Topic uses the same Topic interface as a regular topic. The main difference is that Reliable Topic is backed up by the Ringbuffer data structure, a replicated but not partitioned data structure that stores its data in a ring-like structure.

publish(message: MessageType) Future[None][source]

Publishes the message to all subscribers of this topic.

Parameters

message – The message.

publish_all(messages: Sequence[MessageType]) Future[None][source]

Publishes all messages to all subscribers of this topic.

Parameters

messages – Messages to publish.

add_listener(listener: Union[ReliableMessageListener, Callable[[TopicMessage[MessageType]], None]]) Future[str][source]

Subscribes to this reliable topic.

It can be either a simple function or an instance of an ReliableMessageListener. When a function is passed, a ReliableMessageListener is created out of that with sensible default values.

When a message is published, the, ReliableMessageListener.on_message() method of the given listener (or the function passed) is called.

More than one message listener can be added on one instance.

Parameters

listener – Listener to add.

Returns

The registration id.

remove_listener(registration_id: str) Future[bool][source]

Stops receiving messages for the given message listener.

If the given listener already removed, this method does nothing.

Parameters

registration_id – ID of listener registration.

Returns

True if registration is removed, False otherwise.

destroy() bool[source]

Destroys underlying Proxy and RingBuffer instances.

blocking() BlockingReliableTopic[MessageType][source]

Returns a version of this proxy with only blocking method calls.

class BlockingReliableTopic(wrapped: ReliableTopic[MessageType])[source]

Bases: ReliableTopic[MessageType]

name
service_name
publish(message: MessageType) None[source]

Publishes the message to all subscribers of this topic.

Parameters

message – The message.

publish_all(messages: Sequence[MessageType]) None[source]

Publishes all messages to all subscribers of this topic.

Parameters

messages – Messages to publish.

add_listener(listener: Union[ReliableMessageListener, Callable[[TopicMessage[MessageType]], None]]) str[source]

Subscribes to this reliable topic.

It can be either a simple function or an instance of an ReliableMessageListener. When a function is passed, a ReliableMessageListener is created out of that with sensible default values.

When a message is published, the, ReliableMessageListener.on_message() method of the given listener (or the function passed) is called.

More than one message listener can be added on one instance.

Parameters

listener – Listener to add.

Returns

The registration id.

remove_listener(registration_id: str) bool[source]

Stops receiving messages for the given message listener.

If the given listener already removed, this method does nothing.

Parameters

registration_id – ID of listener registration.

Returns

True if registration is removed, False otherwise.

destroy() bool[source]

Destroys underlying Proxy and RingBuffer instances.

blocking() BlockingReliableTopic[MessageType][source]

Returns a version of this proxy with only blocking method calls.