Effectively-once semantics in Apache Pulsar

February 20, 2018

Matteo Merli

Messaging systems typically offer one or more of three different messaging guarantees: at-most-once, at-least-once, and so-called exactly-once.

At-most-once and at-least-once guarantees are widely understood amongst software architects and their definitions are generally clear and consistent across all implementations. There are, however, a few caveats. There’s disagreement surrounding, for example, what kind of durable storage system should be used to back at-least-once guarantees. But that disagreement tends to be practical, not conceptual.

With exactly-once guarantees, however, things are different. There is currently no clear agreement in the industry, in either practical or theoretical terms, regarding what exactly-once really means. Almost every messaging system vendor has claimed that they support “exactly-once,” yet there’s a great deal of variance in what guarantees are actually provided.

There has also been a lot of discussion about whether exactly-once is even possible or how it compares with distributed systems concepts like atomic broadcast and consensus, which are concepts with a well-established formal definition. In this post, I’d like to steer away from the controversy and focus on more practical concerns, specifically on how developers can create applications that process streaming data in an “exact” fashion.

Here at Streamlio, we prefer to use the term effectively-once over “exactly-once” because it highlights the fact that a messaging system needs to be able to detect and discard duplicate messages and to do so with a 100% degree of accuracy. We call this feature message deduplication.

With effectively-once guarantees, a message can be received and processed more than once, which is common in the presence of failures. The crucial thing, though, is that the effects on the resulting state will be observed only once. In other words, “exactly-once” refers to how messages are processed (which your application doesn’t care about) while effectively-once refers to the actual outcome of processing (which your application often cares a great deal about).

Guaranteeing message deduplication

In order to ensure that messages can be processed only once, even in the presence of any combination and sequence of failures, we need to break the problem into two parts:

  1. Ensuring that the message is published only once, or producer idempotency
  2. Allowing the application to account for the effect of a message on the resulting state just once (when counting different types of events, for example, the observed result should always be correct)

Producer idempotency

With producer idempotency, we mean that the system as a whole needs to be able to identify and discard messages that have already been published and are being retransmitted. Retransmissions, in distributed systems, can happen for a variety of reasons:

  • A Pulsar broker, which is responsible for handling messages, has crashed
  • Network issues prevented the message from being delivered to the proper recipient
  • A message-producing client crashed and needed to send the message again when it came back online

In all of these cases, a producer application might have sent some messages but then didn’t receive a successful response from the broker. The normal reaction, inside the Pulsar client library, is to re-send these messages to make sure that they were indeed published.

Since release 1.20, however, Pulsar has supported message deduplication at the system level. This can be enabled in the namespace configuration. Here’s an example:

$ pulsar-admin namespaces set-deduplication $MY_NAMESPACE --enable

With this setting, Pulsar brokers will ensure that duplicated messages will be discarded rather than persisted.

Pulsar’s broker-level deduplication logic is based on a record-keeping system. Each broker keeps track of the last “successfully” published message ID for each individual message producer. While this information is usually accessed and updated in memory in the critical path, Pulsar also takes periodic snapshots of this state and stores them in a durable and replicated fashion. The snapshot, combined with the log of messages published after it was taken, ensure the same exact state can be reconstructed after a broker crash.

Pulsar stores the snapshot of the producer’s sequence IDs using a “cursor”. Cursors are already part of Pulsar, serving as the basis of Pulsar topic subscriptions. A broker will keep track, in memory, of the last published sequence ID for each producer and will take a snapshot of this information every N messages. A snapshot will contain the producer -> last sequence ID mapping and it will reflect the state at a particular position in the stream. The cursor is then moved to that position, with the snapshot atomically associated with it.

When recovering, any other broker will be able to reconstruct the same exact state by loading the last snapshot and applying all the changes from that position on.

One of the advantages of this approach is that it has very low overhead (just one in-memory hashmap lookup) in the critical path when performing the deduplication task.

Addressing producer application crashes

So far, we’ve been talking about detecting duplicates when there are network issues or when Pulsar brokers are crashing. Let’s say that we create a producer for a given Pulsar topic (in Java):

Producer producer = client.createProducer(TOPIC_NAME, conf);

When creating a producer instance the system will assign a unique name to it, and this producer will start publishing messages starting from the message with a sequence ID of 0. Throughout the lifecycle of this producer instance, the deduplication will be honored, but what happens if there’s a crash and a new producer is created for the topic? In this case, Pulsar won’t be able to recognize the new producer as related to the previous one since they will have a different producer name.

To ensure continuity, we need to make sure we’re using the same producer name:

ProducerConfiguration conf = new ProducerConfiguration();
conf.setProducerName("my-producer");
conf.setSendTimeout(0, TimeUnit.SECONDS);
Producer producer = client.createProducer(TOPIC_NAME, conf);

With this simple change, when we restart the producer, it will automatically discover the last successfully published sequence ID from the previous instance of the same producer, and it will restart publishing from that sequence ID.

To access the last published sequence ID after the producer has been created:

long lastSequenceId = producer.getLastSequenceId();

We also have to set the send timeout to 0—meaning an “infinite” timeout—because we cannot stop retrying if we want to make sure the message is processed effectively, or else we would fall back on the at-most-once field.

While this mechanism will prevent most duplicates in the presence of crashes, some help from the application is nonetheless needed to achieve 100% coverage. Specifically, the sequence ID attached to each message needs to be associated with some application-specific property.

Effectively-once publishing in practice only makes sense when the messages are coming from a replayable source as opposed to a non-replayable source (for example online HTTP requests). For non-replayable sources, there’s no way to re-send the previous pending messages after a crash.

The most simple example of a “replayable” source is a file (or a sequence of files). For example, let’s say we are reading records from a file and publishing a message for each record we read. If this application crashes and restarts, we want to resume publishing from record next to the last successfully published record before the crash.

For this, we could use the record offset in the file as the sequence ID and, through that ID, recover which offset we need to read from after the crash:

ProducerConfiguration producerConf = new ProducerConfiguration();
producerConf.setProducerName("my-producer-name");
Producer producer = client.createProducer(topic, producerConf);

// Fictitious record reader class
RecordReader source = new RecordReader("/my/file/path");

long fileOffset = producer.getLastSequenceId();
if (fileOffset > 0) {
    source.seekToOffset(fileOffset);
}

while (source.hasNext()) {
    long currentOffset = source.currentOffset();
    Message msg = MessageBuilder.create()
        .setSequenceId(currentOffset)
        .setContent(source.next()).build();
   producer.send(msg);
}

Consuming messages only once

As we mentioned earlier, the crucial thing in effectively-once semantics is the ability to detect and discard duplicates at every step of the processing chain, and, as with the producer discussion, the most interesting part is always when crossing the boundaries of a particular system.

For consuming messages, Pulsar provides a very convenient consumer abstraction that works in conjunction with a topic subscription. Here’s an example:

Consumer consumer = client.subscribe(MY_TOPIC, MY_SUBSCRIPTION_NAME);

while (true) {
    Message msg = consumer.receive();
    // Process the message...
    consumer.acknowledge(msg);
}

With a consumer, the application doesn’t need to think about where it needs to resume consuming from. Pulsar will automatically resume delivering messages from the first unacknowledged message. This is usually very convenient, though it doesn’t leave room for finer control over where to restart the consumption.

An application that needs to process messages only once using the consumer will have to rely on some external system to perform the final deduplication. Here are some example deduplication mechanisms:

  • When writing into a relational DB, store the message ID (or some other application-specific property that is unique per message) along with the data stored.
  • Use application-specific sequence IDs and always perform critical writes when updating the state (for example compareAndSet()).

These options, though, are not always available or desirable in all circumstances.

The fundamental capability that is required for effectively-once consumption is to tie the act of processing the data and storing its transformed output with the act of “acknowledging” the message in a single atomic action.

Since the Consumer API is not apt for this, we have introduced the Reader concept in Pulsar. Instead of requiring a managed subscription, Readers let the application specify where to read from.

With the Reader API, the application can store the message ID associated with the last successfully processed message in an external system, for example along with the output of the data processed. After a crash, the application can recover the last successfully processed message ID and create a reader that will start reading from the very next message. Here’s an example:

MessageId lastMessageId = recoverLastMessageIdFromDB();
Reader reader = client.createReader(MY_TOPIC, lastMessageId,
                                    new ReaderConfiguration());

while (true) {
    Message msg = reader.readNext();
    byte[] msgId = msg.getMessageId().toByteArray();

    // Process the message and store msgId atomically
}

Performance considerations

We have done extensive testing and evaluation of the performance when the deduplication feature is enabled on a topic. We have found no measurable overhead in terms of throughput or latency degradation.

The principal reason for the lack of performance degradation is that Pulsar can guarantee ordering of published messages even when there is more than one outstanding message in flight from the producer to the broker.

The other reason why deduplication is such a light-weight feature is that in the critical path, when publishing a particular message, there is only one additional local in-memory hashmap lookup and update. The same information is then snapshotted periodically in the background.

Because we can leverage Pulsar’s cursor, the snapshot is associated with a particular position in the topic and it is updated atomically with the position. Additionally, we can make sure that all the messages published after the snapshot are retained and available for recovery.

The only practical overhead for deduplication is indeed the recovery time. When a broker needs to recover a certain topic after a crash, it will need to load the snapshot and replay the messages from the associated position to reconstruct the last sequence ID for each producer. The number of messages to replay can be configured using the brokerDeduplicationEntriesInterval parameter.

Conclusion

We have shown how Pulsar supports effectively-once semantics to achieve guaranteed end-to-end deduplication from producer to broker and to consumers, and even when adding geo-replication to the mix.

Message deduplication, especially in conjunction with the strong durability guarantees of Pulsar/BookKeeper (data is replicated and committed to stable storage before acknowledging), makes for a very powerful combination when processing critical data.