Geo-replication in Apache Pulsar, part 2: patterns and practices

October 10, 2017

Sijie Guo

In a previous blog post, I gave an overview of the geo-replication feature in Apache Pulsar. Apache Pulsar, leveraging the scalable stream storage provided by Apache BookKeeper, is a messaging system that supports both synchronous geo-replication (via Apache BookKeeper) and asynchronous geo-replication (configured at the Pulsar broker level) across multiple datacenters. In this blog post, I will describe a few patterns that people can use to set up geo-replication across multiple datacenters.

Asynchronous geo-replication in 30 seconds

First, let me spend 30 seconds explaining how asynchronous geo-replication in Apache Pulsar works. Figure 1 below illustrates a full-mesh geo-replication setup between 3 datacenters in Apache Pulsar.

Figure 1. Full-mesh geo-replication in three datacenters with Apache Pulsar
Figure 1. Full-mesh geo-replication in three datacenters with Apache Pulsar

In this diagram, whenever producers P1, P2, and P3 publish messages to the topic T1 in Cluster A, Cluster B, and Cluster C, respectively, those messages are instantly replicated across clusters. Once replicated, consumers C1 and C2 can consume those messages from their respective clusters.

Geo-replication is enabled on a per-tenant basis in Pulsar. Geo-replication can be enabled between two specific clusters only when a tenant has access to both clusters. This is managed at the namespace level, enabling you to create and configure a namespace to replicate between two or more provisioned clusters that a tenant can access. Any message published on any topic in that namespace will then be replicated to all the clusters in the specified set.

Full-mesh geo-replication in 3 commands

With just a few commands we can see how easy it is to enable geo-replication in Apache Pulsar (in contrast with existing messaging systems, no additional replicator or mirror maker is needed for geo-replication). Let me explain this by setting up full-mesh geo-replication between Pulsar clusters.

Assume that there are 3 datacenters: us-west, eu-central, and apac-australia. Before you start, you need to create a property in Pulsar (a tenant) and grant that tenant permissions to access all three of these datacenters.

$ bin/pulsar-admin properties create my-property \
  --admin-roles my-admin-role \
  --allowed-clusters us-west,eu-central,apac-australia

Replication is configured at the namespace level. Let’s create a namespace called full-mesh:

$ bin/pulsar-admin namespaces create my-property/global/full-mesh

Initially, the namespace will not be assigned to any cluster. You can assign the namespace to a cluster—or multiple clusters—using the set-clusters command. Let’s assign this namespace to all three available clusters.

$ bin/pulsar-admin namespaces set-clusters my-property/global/full-mesh \
  --clusters us-west,eu-central,apac-australia

After issuing these three commands, you are all set with full-mesh geo-replication among three datacenters. Now, messages produced in any datacenter will be automatically replicated to the other two datacenters for consumption.

If you want to change replication settings in the future, for example if your company is building a fourth datacenter or retiring an existing one, you can change the replication setting at any time, with no disruption to ongoing traffic. Replication channels will be immediately set up or stopped in all the clusters as soon as the configuration changes are applied to those datacenters. The example command below adds a fourth datacenter apac-china to the namespace full-mesh.

$ bin/pulsar-admin namespaces set-clusters my-property/global/full-mesh \
  --clusters us-west,eu-central,apac-australia,apac-china

Selective replication per message

By default, the full-mesh namespace is set up with full-mesh replication between clusters, and messages are replicated to all the clusters configured for the namespace. You can restrict replication selectively, however, by directly specifying a replication list for a message at the application level. That message will then be replicated only to the subset in the replication list.

The example code below shows an example, using the Pulsar Java client, of producing a message only to the datacenter apac-australia.

List<String> restrictDatacenters = Lists.newArrayList("apac-australia");

Message message = MessageBuilder.create()
    
    .setReplicationClusters(restrictDatacenters)
    .build();

producer.send(message);

Geo-replication patterns

Using full-mesh replication and applying the selective replication info when sending messages, you have a great deal of flexibility in building any kind of replication topologies you’d like between any number of datacenters in just a few commands.

Figure 2. Full-mesh geo-replication across 3 datacenters
Figure 2. Full-mesh geo-replication across 3 datacenters

Besides full-mesh geo-replication, there are a few other replication patterns you can use.

Active-active replication

This is a special case of full-mesh geo-replication, with only two datacenters, where producers can run at any datacenters to produce messages, and consumers are able to consume all messages from all datacenters.

Active-standby replication

This is a special case of active-active geo-replication. Producers produce messages to the active datacenter while messages are replicated to the standby datacenter for backup. When the active datacenter goes down, the standby datacenter will take over and become the active datacenter. You can then make the producers produce messages to the standby datacenter (which is now the active datacenter).

Figure 3. An active-standby geo-replication setup
Figure 3. An active-standby geo-replication setup

Aggregation replication

Sometimes you want to replicate messages from multiple fronting datacenters to a central datacenter for aggregation purposes. For example, assume you have 3 fronting datacenters—front1, front2, front3—and one aggregation datacenter (called aggregated). You can then create a front1-aggregation namespace for the topics used by the front1 datacenter, and a front2-aggregation namespace for the topics used by front2 datacenter, and a front3-aggregation namespace for front3. Finally, you can assign the aggregated datacenter to those namespaces.

Figure 4. An aggregation geo-replication setup across 4 datacenters
Figure 4. An aggregation geo-replication setup across 4 datacenters

Aggregation and edge computing

The aggregation pattern is typically seen when replicating Internet of Things (IoT) messages from the edge to the cloud. Pulsar is able to support this very easily. The flexibility and the tools to effectively manage the replication settings become critical to the success of edge computing. Because you don’t have to set up very heavy replicators or mirror makers.

Geo-replication best practices

Besides the general instructions for setting up your own geo-replication scheme, there are a few best practices that we recommend following.

Monitoring the replication backlog

Monitoring a Pulsar cluster is very important for your business. Most importantly, with geo-replication the chances of experiencing network partitions or degraded network performance between different datacenters are much higher than within a single datacenter. It is therefore critical that you monitor replication stats closely, especially the replication backlog.

The replication backlog is the number of messages that were produced to the origin cluster but have not yet been replicated to the other remote clusters. There are two important reasons to monitor the replication backlog:

  1. If there is a need to failover from the datacenter of origin to the destination datacenter, all the messages that were produced at the origin and didn’t arrive at the destination yet will potentially be inaccessible until the origin datacenter comes back online.
  2. Any message processing that happens at the destination will be delayed by the backlog.

The backlog is typically just a few messages (depending on the network latency between two datacenters). But it can grow larger if the network partitions. If the backlog keeps growing, this indicates that the replication throughput is lower than what is being produced at the origin cluster (for example, producers are producing messages at 100 MB/s at the origin cluster but Pulsar can only replicate messages at 50 MB/s) and adding more brokers in the remote datacenter or more bandwidth between two datacenters is necessary.

For more details on how and what to monitor on replication stats, please read the documentation on Pulsar stats.

Capacity planning for datacenter failure

Another critical thing to look out for when designing a geo-replication plan for multiple datacenters is to ensure that the messaging (brokers in Apache Pulsar) and storage components (bookies in Apache BookKeeper) are able to sustain an extensive period of time for building backlogs when one or more datacenters is down, from hours to several days. It is important to at least have a plan in case the failover cluster doesn’t have enough capacity to absorb all the failover traffic.

It is also critical, when the cross-datacenter network issue resolves, to be able to provide enough network or I/O bandwidth to drain the backlog faster than the new messages are being produced and without impacting existing traffic.

Throttle replication

When a datacenter failure strikes, a lot of messages will accumulate in the destination clusters. When a datacenter recovers, Pulsar needs to replicate the messages that have accumulated in the destination clusters back to the origin cluster. Because there is a huge backlog to drain for replication, you have to pay attention to the draining rate for those backlogs and prioritize the I/O for important topics and throttle less important topics. This ensures that your business can recover to a normal state ASAP.

For example, to limit the per-topic drain rate on each topic in the namespace:

$ bin/pulsar-admin namespaces set-dispatch-rate my-property/global/full-mesh \
  --msg-dispatch-rate 20000

Conclusion

In this blog post, we have provided some ideas on how to use asynchronous geo-replication in Apache Pulsar and have presented some recommended best practices. We hope that this provides you with a better understanding of Apache Pulsar and its geo-replication feature. In the next blog post, we’ll examine the durable messaging feature and effectively-once semantics in Apache Pulsar.

If you’re interested in Pulsar, you may want to participate in the Pulsar community via:

For more information about the Apache Pulsar project in general, please visit the official website at http://pulsar.apache.org/ and follow the project on Twitter @apache_pulsar.