Introducing Pulsar IO

August 20, 2018

Jerry Peng

Apache Pulsar is a state-of-the-art messaging system.  One of the common questions that comes up when using platforms like these: what is the best way for moving data into and out of the platform?  Users can write custom code using Pulsar’s consumer and producer APIs to accomplish their desired data movement, of course, but is that the only available approach?

There are a few related questions that users have asked:

  1. Where should I run my application to publish data to or consume data from Pulsar?
  2. How should I run my application to publish data to or consume data from Pulsar?

These questions arise because other messaging/pub-sub systems don’t have an organized and fault-tolerant way to run applications that ingress and egress data from and to external systems, which forces you to build custom solutions and manually run them.

To address these questions and make this process much less cumbersome, we’re excited to introduce Pulsar IO, a framework that allows you to ingress or egress data from and to Pulsar using the existing Pulsar Functions framework.  By doing so, all the benefits of Pulsar Functions framework, such as fault tolerance, parallelism, elasticity, load balancing, on-demand updates, and much more can be utilized by applications that ingress and egress data from Pulsar.

Moreover, we have seen countless times in the past, users struggle to write custom applications to ingress or egress data from messaging systems because they are not experts (nor do he or she wish to be) in the systems involved.  Not only can it be sometimes difficult for users to write these applications, we also often see duplication of work when many users attempt to implement applications that do that same thing. At the end of the day, a messaging system is only a tool for users to accomplish their data movement needs. Thus, one of our key goals when designing Pulsar IO framework is ease of use.  We want the users to be able to ingress or egress data from and to external systems without having to write any code or become experts in both Pulsar and the external system.  We will explain how this is accomplished later in the blog.

What does the framework look like?

Let’s first define an application that ingresses data into Pulsar as a source and an application the egresses data from Pulsar as a sink.

Figure 1. Pulsar sources and sinks
Figure 1. Pulsar sources and sinks

A source is an application that ingests data from an external system into Pulsar, while a sink is an application that egresses data from Pulsar to an external system.  To be more concrete, a source will read data from an external system and write the data to a Pulsar topic, while a sink will read data from one or more Pulsar topics and write to an external system.

The Pulsar IO framework runs on top of the existing Pulsar functions framework.  Individual sources and sinks can run like any function alongside Pulsar brokers as shown in Figure 2.

Figure 1. Sources, sinks, and functions running on brokers
Figure 1. Sources, sinks, and functions running on brokers

Thus, all the benefits of the Pulsar Functions framework is also available to to Pulsar IO framework i.e. sink and source applications.

As mentioned before, one of our design goals is for users not to have to write any custom applications or, for that matter, any code at all to ingress and egress data from and to Pulsar. Thus, we have a wide variety of built-in sources and sinks (Kafka, Twitter Firehose, Cassandra, Aerospike, etc. with many more coming) that users can run with just a single command.  This really allows the user to not have to worry about implementation details and really focus on their business logic. We really

How to use Pulsar IO

Using the Pulsar IO framework is easy.  You can start running a built-in source or sink with a single CLI command. You can submit a source to be run in an existing Pulsar cluster, for example, using a command of this form:

$ ./bin/pulsar-admin source create \    
  --tenant <tenant> \    
  --namespace <namespace> \    
  --name <source-name> \    
  --destinationTopicName <input-topics> \    
  --source-type <source-type>

Below is an example command for running the twitter firehose source that ingresses data from Twitter into Pulsar:

$ ./bin/pulsar-admin source create \--tenant test \
  --namespace ns1 \
  --name twitter-source \
  --destinationTopicName twitter_data \
  --sourceConfigFile examples/twitter.yml \
  --source-type twitter

That is all you need to start ingesting data to Pulsar!  There is not need to write or compile any code. The only thing you may need is a config file to specify certain configs for that source or sink. You can submit a built-in sink to be run in an existing Pulsar cluster by using a command of this form:

$ ./bin/pulsar-admin sink create \   
  --tenant <tenant> \   
  --namespace <namespace> \   
  --name <sink-name> \   
  --inputs <input-topics> \   
  --sink-type <sink-type>

Below is an example command for running the Cassandra sink that egresses data from Pulsar to Cassandra:

$ ./bin/pulsar-admin sink create \   
  --tenant public \   
  --namespace default \   
  --name cassandra-test-sink \   
  --sink-type cassandra \   
  --sinkConfigFile examples/cassandra-sink.yml \   
  --inputs test_cassandra

For a more comprehensive/end-to-end guide on how to run the Cassandra sink please follow the instructions in the quickstart guide: http://pulsar.apache.org/docs/en/io-quickstart/

The above commands show you how to run sinks and sources in “cluster” mode, i.e. as part of an existing Pulsar cluster. But you also have the option to run sources and sink as standalone processes in “local run” mode, which spawns local processes on a machine that executes the source or sink logic.  Local run mode can be helpful for testing and debugging, however, monitoring and supervision are left to you to manage. Below is an example command of how to run a sink in local run mode:

$ ./bin/pulsar-admin sink localrun \
  --tenant public \   
  --namespace default \   
  --name cassandra-test-sink \   
  --sink-type cassandra \   
  --sinkConfigFile examples/cassandra-sink.yml \   
  --inputs test_cassandra

Since the Pulsar IO framework runs on top of Pulsar Functions, you can dynamically update the source or sink with new parameters and configs.  For example if I want the twitter firehose source I mentioned previously to ingress data to a different Pulsar topic, I can do that will a simple command:

$ ./bin/pulsar-admin source update \--tenant test \
  --namespace ns1 \
  --name twitter-source \
  --destinationTopicName twitter_data_2 \
  --sourceConfigFile examples/twitter.yml \
  --source-type twitter

The same format can be used for updating sinks as well.  Almost any config can be updated while the sink and source is still running.  This makes modifying, testing, and deploying a much more streamlined process.

If you have a niche use case that requires a custom implementation, you can create a source or sink yourself by implementing a simple interface.  However, the goal of Pulsar IO is for most users to not have to implement a source or sink themselves, but rather use an existing built-in one.

Implementing a custom source

To create a custom source, you need to write a Java class that implements the Source interface:

public interface Source<T> extends AutoCloseable {
/**
 * Open source with configuration
 *
 * @param config initialization config
 * @throws Exception IO type exceptions when opening a connector
 */

    void open(final Map<String, Object> config) throws Exception;
    /**
     * Reads the next message from source.
     * If source does not have any new messages, this call should block.
     * @return next message from source.  The return result should never be null
     * @throws Exception
    */

    Record<T> read() throws Exception;

}

Here’s a simple example of a source implementation:

public class TestSource implements Source<Integer> {
   private int i = 0;
   @Override
   public void open(Map<String, Object> config) throws Exception {

   }

   @Override
   public Record<Integer> read() throws Exception {
       return () -> i++;
   }

   @Override
   public void close() throws Exception {

   }
}

In the above example source, a monotonically increasing integer is ingested into Pulsar.  An object that implements the “Record” interface needs to be returned by the “read” method because the “Record” interface contains fields that you can use to implement different message delivery semantics or guarantees, such as exactly-once/effectively-once.  I’ll leave detailed discussion of how to do that for subsequent blog posts.

Implementing a custom sink

To create a custom sink, users need to write a Java application that implements the following interface:

public interface Sink<T> extends AutoCloseable{
   /**
    * Open Sink with configuration
    *
    * @param config initialization config
    * @throws Exception IO type exceptions when opening a connector
    */
   void open(final Map<String, Object> config) throws Exception;

   /**
    * Write a message to Sink
    * @param inputRecordContext Context of value
    * @param value value to write to sink
    * @throws Exception
    */
   void write(RecordContext inputRecordContext, T value) throws Exception;
}

For example, a simple implementation of a sink could be:

public class TestSink implements Sink<String> {
   private static final String FILENAME = "/tmp/test-out";
   private BufferedWriter bw = null;
   private FileWriter fw = null;
   @Override
   public void open(Map<String, Object> config) throws Exception {
       File file = new File(FILENAME);
       // if file doesnt exists, then create it
       if (!file.exists()) {
           file.createNewFile();
       }
       fw = new FileWriter(file.getAbsoluteFile(), true);
       bw = new BufferedWriter(fw);
   }

   @Override
   public void write(RecordContext inputRecordContext, String value) throws Exception {
       try {
           bw.write(value);
           bw.flush();
       } catch (IOException e) {
           throw new RuntimeException(e);
       }
   }

   @Override
   public void close() throws Exception {
       try {
           if (bw != null)
               bw.close();
           if (fw != null)
               fw.close();
       } catch (IOException ex) {
           ex.printStackTrace();
       }
   }
}

In the above sink example, data is read from Pulsar and written to a file.  Similar to the source interface, the “write” method in the sink interface has a parameter called RecordContext.  This parameter provides the sink with the context of the value that needs to be written to an external system. The RecordContext parameter can used to implement a sink that provides different level message delivery semantics or guarantees such as Exactly-once/Effective-once.  We will do a deeper dive into this in subsequent blog posts.

Users can submit custom sinks and sources in a similar way to how you would run those that are built-in:

$ ./bin/pulsar-admin source create \
  --className  <classname> \
  --jar <jar-location> \
  --tenant <tenant> \
  --namespace <namespace> \
  --name <source-name> \
  --destinationTopicName <output-topic>

Here’s an example command:

$ ./bin/pulsar-admin source create \
  --className org.apache.pulsar.io.twitter.TwitterFireHose \
  --jar \~/application.jar \
  --tenant test \
  --namespace ns1 \
  --name twitter-source \
  --destinationTopicName twitter_data

Similarly, you can submit a custom sink to be run in an existing Pulsar cluster using a command of this form:

$ ./bin/pulsar-admin sink create \
  --className  <classname> \
  --jar <jar-location> \
  --tenant test \
  --namespace <namespace> \
  --name <sink-name> \
  --inputs <input-topics>

Here’s an example command:

$ ./bin/pulsar-admin sink create \
  --className  org.apache.pulsar.io.cassandra \
  --jar \~/application.jar \
  --tenant test \
  --namespace ns1 \
  --name cassandra-sink \
  --inputs test_topic

Benefits of using the Pulsar IO framework

As mentioned above, the Pulsar IO framework runs on top of the existing Pulsar functions framework. By taking advantage of the existing Pulsar Functions framework, sources and sinks that are a part of Pulsar IO get all the existing benefits of Pulsar Functions:

Benefit Details
Execution flexibility Sources and sinks can run as part of an existing cluster or as a standalone process
Parallelism To increase the throughput of a sink or source, multiple instances of sources and sink can be run by just adding a simple configuration.
Load balancing If sources and sink are run in “cluster” mode
Fault-tolerance, monitoring, and metrics If sources and sink are run in “cluster” mode, the worker service as part of the Pulsar function framework will automatically monitor deployed sources and sinks.  When nodes fail, sources and sink we be redeployed to operational nodes. Metrics are also automatically collected
Dynamic updates Each connector’s parallelism, source code, ingress and egress topics, and many other configurations can be changed on the fly
Data locality Since brokers serve read and write requests to topics, running sources and sinks adjacent to brokers may reduce network latency and network bandwidth usage

How can I try it out?

We hope to have piqued your interested in Pulsar IO and convinced you that this is a super easy framework to use to ingress and egress data to and from Pulsar. The Pulsar IO framework is officially released with the official Pulsar 2.1.0 release:
https://github.com/apache/incubator-pulsar/releases/tag/v2.1.0-incubating

More documentation can be found here:
http://pulsar.apache.org/docs/en/io-overview/