Debugging Pulsar Functions in Java

June 17, 2019

Jerry Peng

In this tutorial, I will go over a couple methods of how to test and debug Pulsar Functions from your local development environment.

A Brief Overview of Pulsar Functions

Pulsar Functions is a stream processing framework that allows users to:

  1. Consume data from one one more Pulsar topics
  2. Execute user defined logic on the data
  3. Optionally produce results to another topic

Pulsar Functions can also be viewed as a programming paradigm that abstracts processing streams of data as a function with an input and output.

Below is a simple example of a Pulsar Function:

import java.util.function.Function;

public class JavaNativeExclamationFunction implements Function<String, String> {
  @Override
  public String apply(String input) {
    return String.format("%s!", input);
  }
}

Code Snippet 1

Or if you would like to use the Function API from the Pulsar Functions SDK:

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public class ExclamationFunction implements Function<String, String> {
  @Override
  public String process(String input, Context context) {
    return String.format("%s!", input);
  }
}

Code Snippet 2

For more information on Pulsar Functions, please see our blog about Pulsar Functions: https://streaml.io/blog/pulsar-functions

How-to Debug a Function?

A Pulsar Function at its core is just another function with an input and output. You can write unit tests to test a Pulsar Function just as you would any other function.

For example, you can test the Pulsar Function from Code Snippet 1 by writing a JUnit test like the following:

@Test
public void testJavaNativeExclamationFunction() {
  JavaNativeExclamationFunction exclamation = new JavaNativeExclamationFunction();
  String output = exclamation.apply("foo");
  Assert.assertEquals(output, "foo!");
}

Writing a test for a Pulsar Function implemented using the SDK (e.g. Code Snippet 2) is basically the same as above, but you just need to mock out the Context.class argument. Below is an example:

@Test
public void testExclamationFunction() {
  ExclamationFunction exclamation = new ExclamationFunction();
  String output = exclamation.process("foo", mock(Context.class));
  Assert.assertEquals(output, "foo!");
}

Local-run mode

Please note that this feature is available via the Streamlio distribution of Apache Pulsar (2.4.0-streamlio-25). We have contributed this feature to Apache Pulsar and expect it to be part of the Apache Pulsar 2.4 release.

If you want to test your Pulsar Function in a more realistic fashion, perhaps even with actual data from a Pulsar cluster, you can launch your Pulsar Function via localrun mode. Using the localrun mode for Pulsar Functions allows users to bring up an instance of their Pulsar Function locally on their machine/IDE for easy debugging.

First, you need to add the pulsar-functions-local-runner as a dependency:

<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-functions-local-runner</artifactId>
  <version>2.4.0-streamlio-25</version>
</dependency>

For more information on how to link with the Streamlio maven repo please refer to
https://docs.streaml.io/pulsar/distribution#java-client-library

Then, you can start your Pulsar Function in localrun mode as illustrated in the following:

FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName(functionName);
functionConfig.setInputs(Collections.singleton(sourceTopic));
functionConfig.setClassName(ExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput(sinkTopic);

LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).build();
localRunner.start(true);

You can just run your Pulsar Function from your IDE, for example:

public class ExclamationFunction implements Function<String, String> {
  @Override
  public String process(String s, Context context) throws Exception {
    return s + "!";
  }

public static void main(String[] args) throws Exception {
  FunctionConfig functionConfig = new FunctionConfig();
  functionConfig.setName("exclamation");
  functionConfig.setInputs(Collections.singleton("input"));
  functionConfig.setClassName(ExclamationFunction.class.getName());
  functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
  functionConfig.setOutput("output");

  LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).build();
  localRunner.start(false);
}

This also enables you to set breakpoints anywhere in your function to step through your function’s code to debug using your IDE.

The above example assumes you have a Pulsar standalone cluster already running on your local machine. If you would like to connect your Pulsar Function to a specific Pulsar cluster, you can specify a URL for the Pulsar broker you would like to connect to.

For example:

LocalRunner localRunner = LocalRunner.builder()
  .brokerServiceUrl(<BROKER_URL>)
  .functionConfig(functionConfig).build();
localRunner.start(false);

For complete code examples, please take a look at this sample project:

https://github.com/jerrypeng/pulsar-functions-demos/tree/master/debugging

More Information

For more information on Pulsar or Pulsar Functions, please see the following:

  1. https://streaml.io/blog/pulsar-functions
  2. https://streaml.io/blog/pulsar-functions-for-developers
  3. http://pulsar.apache.org/docs/en/functions-overview