Access Patterns and Tiered Storage in Apache Pulsar

October 15, 2018

Ivan Kelly

We’ve spoken previously about how BookKeeper replication works and of the different I/O patterns in BookKeeper that are taken advantage of by Apache Pulsar. In this blog post, we’ll explore how replication interacts with the different I/O patterns and how this interaction allows Pulsar to do nifty things such as tiered storage. Pulsar’s architecture is, by its very nature, tiered, and it is this tiering that allows each I/O pattern to be served in isolation, so that reads will never negatively impact writes and vice versa. The tiering also made it very simple to add another tier of storage in a way that was fully integrated with Pulsar, providing the cost and scalability benefits of an additional tier of storage without any impact on developers.

Pulsar is a messaging system offering both Publish-Subscribe and Queuing semantics. A client can be either a producer or a consumer or both. A producing client sends messages to a broker. A consuming client consumes messages from a broker. Pulsar organizes messages into topics and assigns each topic to a broker. Within a topic, Pulsar guarantees Total Order Atomic Broadcast, meaning that once a Pulsar broker acknowledges the publication of a message in a topic to a producer, that message will never be lost, duplicated or reordered relative to other messages in the same topic. All consumers will see the exact same sequence of messages in the exact same order.

Diagram of message flow in Pulsar
Diagram of message flow in Pulsar

Pulsar uses BookKeeper as a backing store for topic message backlogs. The Pulsar broker acts as a stateless serving layer on top of BookKeeper store. When a producer sends a message to Pulsar [1], Pulsar immediately writes it to BookKeeper. Once BookKeeper acknowledges the write, the broker can acknowledge the message publication to the producer, and consumers can read the message.

There are generally three patterns of I/O in messaging systems.

  • Writes, where a message is published to the system;
  • Tailing reads, where the message is sent to active subscribers immediately after been written; and
  • Catchup reads, where a consumer reads a large number of messages from the suffix of the log to catch up, which occurs if a new consumer wants to start reading the backlog from a point earlier than the latest message, or an existing consumer that comes back after having been offline for an extended amount of time.

Unlike most other messaging systems, in Pulsar each of these I/O patterns is isolated from the other patterns.

Writes are the most interesting I/O pattern, from which all other patterns follow. When a Pulsar broker wants to persist a message for a topic, it writes it to a set of BookKeeper nodes, defined as the write quorum of that topic’s log. Each BookKeeper node that receives the message appends it to its journal file, which resides on a dedicated disk. When enough nodes have acknowledged the write to satisfy the replication requirement of the log, the ack quorum, then the write is considered committed and is acknowledged to the producer. From this point, that message, and the fact that the message occupies that offset of the log is immutable. No other message can ever occupy that offset, and the message itself can never change.

This immutability can be exploited to serve the other I/O pattern of the messaging system efficiently. The write of the message in journals of the BookKeeper nodes is canonical. We could stop here, and the message would still be accessible. However, this would not be very efficient, as each read would have to scan the whole journal to find the message it needed, and we would never be able to truncate the journal to free up disk space. Fortunately, the immutability of the committed message allows the message to be cached in multiple places to serve reads efficiently.

Cache-level view of Pulsar
Cache-level view of Pulsar

The first level of cache is the Pulsar broker itself, and this is used to serve tailing reads. When the message can is committed, it can be sent directly to all subscribers attached to that topic, without having to involve disks at all.

The next level of cache is the ledger-storage disk on BookKeeper nodes. When a message is written to the journal on the BookKeeper node, it is also written to a memory buffer that is periodically flushed to the ledger-storage disk. The BookKeeper node uses this disk to serve reads. In Pulsar it is rare that messages are read from the memory buffer. Tailing consumers typically read messages directly from Pulsar’s cache. Catch up consumers usually request messages that are too old to be in the memory buffer. The ledger storage disk serves these catch up reads. The ledger-storage disk stores messages in a format that optimizes for the ability to store many different topics on the same disk while providing sequential reads within the same topic as much as possible. As the ledger-storage disk is separate from the journal disk, these reads do not interfere with the journal disk’s ability to provide sequential writes.

The final level of cache is long-term storage, which is used if “Tiered Storage” is configured for Pulsar. Tiered Storage allows the user to move older parts of the topic backlog to cheaper forms of storage. Tiered Storage takes advantage of the immutability of message, but at a larger granularity, as storing individual messages in long-term storage would be wasteful. Pulsar topic logs are composed of segments, each segment corresponding to a sequence of 50000 messages by default. There is only ever one segment active at a time. All previous segments are closed. When a segment is closed, no new messages can be added to it. Given that the individual messages in the segment are immutable, and their offsets are immutable, it follows that the segment as a whole is therefore immutable. And we can copy an immutable object anywhere we like.

To use tiered storage in Pulsar, the user must configure a topic namespace with either a time-based or size-based policy for segment offload. When a topic in the namespace hits the threshold defined in the policy, the Pulsar broker copies the oldest segments from that topic’s log to long-term storage, until the topic is under the policy threshold. After a grace period, Pulsar deletes the original segments from BookKeeper, freeing up disk space.

Pulsar currently supports Amazon S3 and S3-compatible object stores as a long-term storage provider. Support for Google Cloud Storage will be available soon in Pulsar 2.2.0, and Azure Storage support is currently in the works. To see how you can set up tiered storage on your Pulsar cluster, check out our blog post on the subject.

1 When using persistent topics