Using Apache Pulsar as a message queue

May 17, 2018

Luc Perkins

Message queues are essential components of most large-scale data architectures. If every single work object that passes through your system absolutely must be processed, come hell or high water, then I'd urge you to use a message queue. Why? Because we know that data processing pipelines are subject to all kinds of failures. Data consumers can be laggy or fail entirely, network partitions can temporarily cut off entire groups of consumers from the data pipeline, and so on.

Here are some scenarios in which a message queue would be indispensable:

  • You're building a ride-sharing application and you need to make sure that each ride request is eventually paired with a driver, regardless of spikes in usage at peak times
  • You're adapting a fully synchronous financial transaction pipeline to enable asynchronous request handling without exposing the pipeline to data loss issues
  • You're building a microservices-based processing pipeline fronted by a REST API with very write-heavy endpoints (thousands of operations per second) and you need to make sure that all work objects are retained in the system even in the face of failure in backend microservices

How message queues work

Figure 1 below provides a basic illustration of how message queues typically work (and respond to failure):

Figure 1. Message queue with a failed consumer
Figure 1. Message queue with a failed consumer

In this figure, producers 1, 2, 3, and 4 are busy pumping messages into the pipeline through the message broker, while consumers 1, 2, 3, and 4 process (and then acknowledge) those messages. In this example, a serious problem emerges when consumer 1 fails. Producers are continuing to pump data into the system but consumer 1 can't handle its tasks. What should happen is that the broker begins storing all message data intended for consumer 1 until it's again ready to process incoming messages.

We can see from this example that a rock-solid storage component is essential to any message queue worth having in your stack. In fact, I'm comfortable declaring that your message queue is only as good as the storage system that backs it. If the storage component is brittle, lossy, or too slow to cope with the failure of any number of components then you're practically begging for trouble.

Enter Apache Pulsar

Traditionally, pub-sub messaging and message queuing have been handled by separate systems. In the past, a typical stack may have included Apache Kafka for pub-sub messaging and RabbitMQ for message queuing, to give an example. If that works for you, fine, but I do doubt that you enjoy deploying and managing multiple messaging systems side by side.

One of the things that I love most about Apache Pulsar is that it effortlessly straddles the divide between pub-sub and queuing. Pulsar is the first open source messaging system that is purpose-built to handle both use cases, and it can handle both with ease because it uses the Apache BookKeeper distributed log storage database as its storage component. BookKeeper is horizontally scalable (adding capacity requires adding more "bookies"), extremely fast, and, because it's a log storage system, built with data structures akin to message topics in mind.

Pulsar supports two basic topic types: persistent and non-persistent topics. You can tell a topic's type based on the name, as the type is the "scheme" of the topic name (just as https is the scheme of the URL https://google.com). A persistent topic would have a name like persistent://public/default/some-topic while a non-persistent topic would have a name like non-persistent://public/default/some-topic. When you use persistent topics, Pulsar stores all unacknowledged—i.e. unprocessed—messages in BookKeeper, on multiple "bookie" servers.

Pulsar does support non-persistent topics, but we recommend using non-persistent messaging only for use cases where message loss is acceptable. You should never use non-persistent topics for topics that are meant to function as message queue topics.

Handling message storage this way provides huge advantages over storing message data in memory.

How to use Apache Pulsar as a message queue

Pulsar succeeds here from a usability perspective because it covers both use cases without requiring any special configuration or knob tweaking. What matters here is how you use Pulsar, as illustrated in this diagram:

Figure 2. Apache Pulsar as a pub-sub system and message queue
Figure 2. Apache Pulsar as a pub-sub system and message queue

Here, pub-sub producers and consumers communicate with one another via a pub-sub topic while queue producers and consumers communicate via a queued topic. The topics don't need to be "marked" or pre-designated as real time or queued. The difference between the topics lies in that the message queue topic will need consumers to use shared subscriptions rather than exclusive or failover subscriptions (plus, all consumers must use the same subscription name, or else it's simply not the same subscription). When consumers establish a shared subscription on a topic, Pulsar automatically load balances between consumers receiving messages, which is optimal for message queues.

The code snippet below shows five Java consumers listening on the same topic with a shared subscription:

String PULSAR_SERVICE_URL = "pulsar://localhost:6650";
String MQ_TOPIC = "persistent://public/default/message-queue-topic";
String SUBSCRIPTION = "sub-1";

// Pulsar client
PulsarClient client = PulsarClient.builder()
        .serviceUrl(PULSAR_SERVICE_URL)
        .build();

// Base consumer builder for instantiating multiple consumers
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
        .topic(MQ_TOPIC)
        .subscriptionName(SUBSCRIPTION)
        .subscriptionType(SubscriptionType.Shared)
        .messageListener(messageCallback);

// Create five consumers (mq-consumer-0, mq-consumer-1, etc.)
IntStream.range(0, 4).forEach(i -> {
    String name = String.format("mq-consumer-%d", i);
    consumerBuilder
            .consumerName(name)
            .subscribe();
});

Controlling message dispatch

Throughput is incredibly important in a message queue. A message queue that doesn't have the throughput to handle what its surrounding data pipeline requires could be not just sub-par but actively harmful. If you're using Pulsar as a message queue you can fine-tune processing throughput by adjusting your consumers' configuration.

By default, Apache Pulsar consumers have a receiver queue that they use to process many messages at a time. The size of each consumer's receiver queue is configurable (the default is 1000 messages). The ideal practice is to set the receiver queue size based on how quickly messages tend to be processed by that consumer. If processing tasks can be completed very quickly (just a few milliseconds) then it's best to make the receiver queue large, as this will help to maximize consumer processing throughput. But if processing tasks take more time it's best to opt for a smaller receiver queue size. If consumers are performing, say, CPU-intensive batch processing jobs that take several seconds or longer, then you may want to set the receiver queue size to just a few or one so that the load balancer properly distributes messages across consumers.

Here's an example of a consumer with a small receiver queue (for Java):

Consumer<byte[]> consumer = client.newConsumer()
        .topic("slow-processing-topic")
        .subscriptionType(SubscriptionType.Shared)
        .subscriptionName("sub-1")
        .receiverQueueSize(5)
        .messageListener(messageCallback)
        .subscribe();

For many use cases the default should be fine. But it's best to keep the receiver queue in mind in case you feel like your message queue could use some tuning.

One messaging platform, two prime use cases

The bottom line: if you're running multiple messaging platforms side by side just for the sake of serving different use cases, you should consider Pulsar. Pulsar can handle both major messaging use cases—pub-sub messaging (especially persistent messaging) and message queuing—in a way that is ultra-fast, scalable, and light on administrative burdens.

More on Apache Pulsar

If you want to learn more about Apache Pulsar, visit the official website at https://pulsar.incubator.apache.org. You can also participate in the Pulsar community via:

For getting the latest updates about Pulsar, you can follow the project on Twitter (https://twitter.com/apache_pulsar).