The Pulsar schema registry

June 6, 2018

Dave Rusek

When working with streaming data you’re all too often limited to using a single abstraction: streams of raw bytes. Raw bytes are nice because they’re an extremely flexible and neutral vehicle for data transfer, but this flexibility and neutrality come with a steep cost: you have to overlay your own type checking and serialization/deserialization to ensure that the bytes fed into the system can be read and successfully interpreted by processes reading the bytes coming out on the other end. In other words, you need to build all sorts of machinery just to make the data intelligible and usable to applications.

The old way: zero coordination, many headaches

One of the least fun things about dealing with raw bytes (among many!) is ensuring data compatibility in light of changes to data types. Personally, I used to see this all the time when working with streaming data. You add or remove a field and all hell breaks loose: pipelines start throwing obscure errors, dashboards stop updating, and hours of debugging ensue. Too often, information about the structure of the data isn’t located anywhere but in the binaries of the end systems, which means that you’re stuck looking through diffs trying to figure out where things went wrong. Nowhere is there a single canonical source of truth for making decisions about the structure of the data as it flows through the system.

And even if you do have a place to store these definitions, it’s often not directly connected to our streaming system. Not only does this make it hard to keep pipelines consistent but it also makes it virtually impossible to discover data within your system, especially in real time. On the other hand, if you know how the data is structured you can build pipelines that can adapt as the data changes. You can connect your streaming data to databases, indexes, or analytics tools with minimal effort and thus use your system to maximal benefit.

Help has arrived: introducing the Pulsar schema registry

Fortunately, there’s now a much better way. The 2.0 release of Pulsar includes the new Pulsar schema registry. Out of the box, the schema registry enables message producers and consumers on Pulsar topics to coordinate on the structure of the topic’s data through the Pulsar broker itself, without needing some kind of external coordination mechanism. The structure of the data, or its schema, may either be specified and uploaded through a REST interface or by the message producer itself. Each piece of data that is passed through the system is in turn stamped with the name and version of the schema it represents. With data schemas, every single piece of data traveling through the system is completely discoverable, enabling us to build systems that can easily adapt as the data changes.

Furthermore, the schema registry keeps track of data compatibility between versions of the schema. As new schemas are uploaded the registry ensures that new schema versions are able to be read by old consumers. This ensures that Producers are not able to break Consumers. This is accomplished by allowing each schema type to define, at the broker, the definition of “compatible”. For instance a json schema that adds fields may be considered compatible with the previous schema since end systems may consume it even without the new information. You can then upgrade your Producers with a new schema, begin producing, confident that your Consumers will not choke on the data.

In order to use a schema the Producer and Consumer only need one extra piece of information. After a Schema is specified you will be able to send and receive data as fully inflated objects. The client and the broker will coordinate to ensure that your objects fit the schema definition.

In the following example, the Producer and Consumer agree to exchange data in JSON format. In order to make this clear to the broker the Producer and Consumer are instantiated with the schema needed for the broker to verify their compatibility. In this case the JSONSchema is used to introspect the SensorReading class, a Jackson compatible POJO, and create a JSON schema. This information is transmitted to the broker at connection time and checked by the broker for compatibility. If either the Producer or the Consumer are deemed incompatible their connection will be terminated with an error.

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

Producer<SensorReading> producer =
    client.newProducer(JSONSchema.of(SensorReading.class))
        .topic("sensor-data")
        .create();

Consumer<SensorReading> consumer =
    client.newConsumer(JSONSchema.of(SensorReading.class)
        .topic("sensor-data")
        .subscriptionName("sensor-subscriber")
        .subscribe();

Out of the box the Pulsar Schema Registry is backed by the same storage layer used to keep track of your streaming data, Apache Bookkeeper. However, the schema storage backend is fully pluggable and supports interoperability with other schema registry tools allowing you to leverage your existing schemas.

Currently the Pulsar Schema Registry supports JSON and “binary” (your good old binary stream of data) representations out of the box. Support for additional formats such as Avro, Protobuf, and Thrift is planned for the near future. As with the schema storage, schema support is fully pluggable and open for extension.

For more info about the Pulsar schema registry, see the official Pulsar documentation.