Scaling out Total Order Atomic Broadcast with Apache BookKeeper

December 14, 2017

Ivan Kelly

Total Order Atomic Broadcast (TOAB) is an extremely useful property for a distributed system to have. Replicated state machines require it, and you’ll need it if you want to build a consistent messaging service. While not strictly required, in practice TOAB will be necessary if you want to provide effectively-once and at-least-once semantics in a stream processing system.

A system has TOAB if it exhibits the following properties (copy-pasted from Wikipedia):

Property Description
Validity If a correct participant broadcasts a message, then all correct participants will eventually deliver it.
Uniform Agreement If one correct participant delivers a message, then all correct participants will eventually deliver that message.
Uniform Integrity A message is delivered by each participant at most once, and only if it was previously broadcast.
Uniform Total Order The messages are totally ordered in the mathematical sense; that is, if any correct participant delivers message 1 first and message 2 second, then every other correct participant must deliver message 1 before message 2.

In more concise terms, TOAB means:

  • no loss of messages
  • no duplication
  • no reordering

TOAB is known to be equivalent to consensus in distributed systems.

There are a number of well-known open source projects that implement TOAB (or stronger variants), in particular ZooKeeper, etcd, and Consul. For various reasons, however, these implementations cannot scale out because they provide only a single replicated log. Adding a new node to the system just increases the communication needed to broadcast one message. Needless to say, this approach is not scalable.

Enter BookKeeper

Apache BookKeeper is a system that allows you to take the TOAB primitive provided by ZooKeeper and scale it out horizontally in such a way that you can have unlimited streams that maintain the TOAB properties listed above. The remainder of this blog post describes how BookKeeper achieves this.

BookKeeper provides a log storage building block known as a ledger.  A ledger can be thought of as a log segment, which is part of a larger log. A ledger is written to a subset of the storage nodes (aka bookies) in the cluster. This subset is the ledger’s ensemble. Each entry of the ledger has a write quorum, which is the nodes of the ensemble to which it is written (usually all) and an ack quorum, which is the number of bookies that must respond for that entry to be acknowledged (usually a majority). Both the write and ack quorum are configurable parameters in BookKeeper.

A ledger only ever has one writer that can add entries to it. This makes it simple to provide both Uniform Total Order and Uniform Integrity, as the writer assigns an entry ID to each entry of the ledger.

Validity and Uniform Agreement are little bit more complex to provide. For Validity we need to ensure that if an entry has been acknowledged to the writer, anyone who tries to read from that ledger will read that entry. Uniform Agreement requires that all future readers of the ledger will see exactly the same set of messages.

While the writer is still active, providing these properties is fairly straightforward. The writer knows which entries have been acknowledged by an ack quorum and it sends this information to the bookies in the ensemble, either as an explicit message or piggybacked on subsequent entries. This information is called the last add confirmed (LAC) and is analogous to the commit phase in Zab or Raft. Readers of the ledger can read up as far as the LAC, with the guarantee that any entry they read will eventually be readable to all other readers.

Handling write crashes

Things get trickier when the writer crashes. The writer may have written entries which have been:

  1. Acknowledged by an ack quorum, but not sent as an LAC.
  2. Persisted by an ack quorum, but the acknowledgements from the bookies never reached the writer.
  3. Persisted in fewer bookies than an ack quorum.

Problematically, these three cases are impossible to distinguish from one another in the presence of failures. To provide Validity we must ensure that all readers can read the entries from case 1. To provide Uniform Agreement we must ensure that all readers agree which entries to keep from cases 2 and 3.

This is done by “closing” the ledger. Closing a ledger decides which entry is the last entry of a ledger and writes this decision, using consensus, so that that decision is immutable for all time.

Multiple readers may try to close a ledger concurrently. Each reader uses a recovery mechanism to figure out what the last entry of the ledger could be. There is no guarantee that each reader will receive the same last entry from the recovery mechanism, as the bookies that each reader can communicate with could differ. However, it is guaranteed that all readers will at least see the last entry which was acknowledged by the writer, as the recovery mechanism reads from all bookies in the write quorum, and halts if it does not hear back from at least one bookie in each possible ack quorum.

To resolve the problem of different readers getting different results from recovery, each reader then tries to write its result to ZooKeeper using a compare-and-swap. Only one of these writes can succeed; those that fail can then read back the successful result to know the correct end of the ledger.

Figure 1. Two readers, **R1** and **R2**, recovering ledger **1**. **R1** can see entry **3**, but **R2** cannot, so recovery on **R1** will suggest a different last entry from recovery on **R2**
Figure 1. Two readers, R1 and R2, recovering ledger 1. R1 can see entry 3, but R2 cannot, so recovery on R1 will suggest a different last entry from recovery on R2

Rounding out the properties

We also need to take into account that there are no perfect failure detectors. This means that a reader may try to recover and close a ledger while the writer is actually still alive. The writer may have been partitioned from the network for a while, so the reader suspected it was dead and acted on that suspicion.

For Validity, we need to ensure that once a reader starts recovering a ledger, the writer cannot add any more entries, because if it can add more entries, it could add and acknowledge entries after the last entry has been decided and no reader would ever see these entries. To solve this issue, BookKeeper has a fencing mechanism that runs at the start of recovery. A “fence” message is sent to each bookie in the ledger’s write quorum, which tells the bookie not to acknowledge any new messages for that ledger. Once the “fence” message is acknowledged by at least one bookie in each possible ack quorum, we know that no new entry from the writer can be acknowledged by an ack quorum of bookies, and thus no new entry can be acknowledged by the writer.

Fencing, along with the recovery mechanism, gives us the Validity property. The final compare-and-swap set write to ZooKeeper, setting the last entry of the ledger immutably, gives us Uniform Agreement, thus completing the set of properties required for a ledger to provide Total Order Atomic Broadcast.

Multiple ledgers

A single ledger on its own is limited in its usefulness. As it only has a single writer, you cannot add anything else to the ledger if that writer crashes. But to be useful as a replicated log for messaging or state machine replication, you need to be able to re-open the log after a crash and start adding entries again.

This is generally accomplished using ZooKeeper. Here I will describe how Apache Pulsar maintains a message log for a topic, but the approach is general and can be used in many other applications.

We don’t recommend that you implement this pattern yourself. We recommend using DistributedLog, which implements it for you, instead. DistributedLog is a high-level client for BookKeeper that abstracts away some of the trickier aspects of working with BookKeeper.

TOAB at work: Apache Pulsar topics

In Pulsar, the message log for a topic consists of a sequence of ledgers. This sequence of ledgers is stored in ZooKeeper. When a Pulsar broker acquires a topic, it reads the sequence of ledgers for the topic from ZooKeeper. Before it can allow messages to be published to the topic, it ensures that the last ledger in the sequence is closed. It then creates a new ledger, adds it to the sequence, and writes the sequence back to ZooKeeper using a compare and set write.

Figure 2. A sequence of ledgers in a Pulsar topic's BookKeeper message log
Figure 2. A sequence of ledgers in a Pulsar topic’s BookKeeper message log

By storing the sequence of ledgers in ZooKeeper, we get the TOAB guarantees on the sequence. Combined with the TOAB guarantees provided by the ledgers themselves, this means that we have TOAB on Pulsar topics. And because DistributedLog uses the same pattern, DistributedLog is also able to provide TOAB.

BookKeeper: a scalable solution

Each ledger that is created is balanced across the set of bookies available at that time. The more bookies you have, the more ledgers you can write to at a given time. This is the very definition of horizontal scalability.

ZooKeeper only needs to be written to when closing a ledger or adding a new ledger to a log, while the entries of the ledger are distributed across the available bookie for that ledger. If you store 10 entries per ledger, BookKeeper will have a scalability factor that’s 10x that of ZooKeeper; if you store 1000 entries per ledger, the scalability factor will increase to 1000x vis-à-vis ZooKeeper.

BookKeeper scalability is a function of both the number of entries per ledger and the number of bookies available, both of which can be easily tuned by the user. For example, if you run a large production cluster of several hundred bookies, managing ~2 million concurrent streams, TOAB guarantees will be provided across all bookies in the cluster. BookKeeper is the only open source system available that can provide such guarantees.


If you’re interested in BookKeeper or DistributedLog, you may want to join the BookKeeper community via the BookKeeper mailing list or the BookKeeper Slack channel. You can also download the latest release from here to get started.

For more information about the Apache BookKeeper project in general, please visit the official website at bookkeeper.apache.org and follow the project on Twitter @asfbookkeeper and @distributedlog.