Simple Event Processing with Apache Pulsar Functions

October 9, 2018

David Kjerrumgaard

In our previous blog post, we introduced the concept of event-driven architecture (EDA) and provided a high-level reference architecture for an EDA built using the core technologies within the Streamlio platform.

Figure 1: Streamlio EDA Reference Architecture
Figure 1: Streamlio EDA Reference Architecture

In this blog post, we will take a deep dive into simple event processors and the capabilities provided by the Streamlio platform to support them.

Event-Based Programming

We have already established that the defining feature of an event-driven architecture is the central importance of events. Within an EDA, event consumers react to the arrival of events following a programming style referred to as event-based-programming (EBP). In contrast to batch-oriented or procedural programming, in EBP the software system executes processing in response to receiving one or more event notifications, and communicates with other software components entirely via events in an asynchronous manner.

While not all event-based applications are the same, they typically follow the structure shown in the figure below, with event producers that introduce events into an event processing framework in the middle that is responsible for persisting and delivering the events to the event consumers.

Figure 2: Event-Based Architecture
Figure 2: Event-Based Architecture

In addition to its event-routing responsibilities, the intermediate event processing framework also hosts the components that we will refer to as event processors. Event processors ingest events and can forward events or emit new ones, so they are in some sense both event consumers and producers. However, we won’t refer to these event processors as either event producers or event consumers as we wish to distinguish them from entities that live outside of the event processing framework.

Types of Event Processors

Within an EDA, event processors generally fall into one of the following categories:

  • Simple Event Processor: The arrival of an event immediately triggers an action within the event processor. These processors are typically either stateless and perform all of their logic based on the current event’s contents only, or stateful and can retain information across invocations in order to perform slightly more complex logic.

  • Complex Event Processor: This type of event processor handles a series of events and performs much more sophisticated pattern analysis for the purpose of identifying meaningful patterns or relationships such as detecting event correlation, causality, or timing. Typical use cases can be found in e-commerce, fraud detection, cybersecurity, financial trading, and other environments that require immediate responses.

Event Processing Networks

As one can imagine, an event-based application typically consists of a number of event processors arranged in a specific sequence or flow. We refer to this collection of event producers, event processors, and event consumers as an event processing network. These event processing networks are composed to solve one or more specific business problems.

As you can see below in Figure 3, the external event producers and consumers are at the edges, with several event processors in between. The lines show the flow of the events between the event processors. They are referred to as implicit channels and are used to push events directly from one event processor to another. When implemented with Apache Pulsar, these implicit channels are implemented as topics.

There is also another method of inter-event-processor communication depicted in Figure 3: shared state management. It is not uncommon for an event processor to need to retain computational state across multiple events. The event processing framework needs to provide a mechanism for persisting this state and allowing it to be accessed by event processors directly. This shared state facility provides another mechanism for sharing information between event processors and enables stateful event processing, which we will discuss in the next section.

Figure 3: Event Processing Network
Figure 3: Event Processing Network

Note that multiple event-based applications can be associated with a single event type. In the above diagram, we depict the chaining of one event-based application (in blue) to a different application. The output of the first application is fed to both event consumer 1 as well as to another event processor for further processing before being sent to event consumer 2.

There are a variety of reasons why you would want to chain event-based applications together. For instance, consider a scenario in which you are monitoring an IoT sensor reading for patterns or abnormalities, while at the same time you wish to retain these events in long-term storage (e.g. HDFS or Amazon S3) for your data science team to use for model training.

The first sequence of event processors would handle the ETL-type processing of the events to transform them into a consumable format. Those records would be sent to event consumer 1, which in this case would be HDFS. At the same time, we also want to forward these cleansed events to our secondary sequence of processors that implements the anomaly detection workflow. In the next section we will explore using Apache Pulsar Functions as a framework for implementing event-based processing using simple programming logic functions.

Event-Based Programming with Apache Pulsar Functions

Apache Pulsar Functions provides an easy-to-use framework that developers can use to create and deploy processing logic that is executed by the Apache Pulsar messaging system. With Pulsar Functions, you can write functions of any level of complexity in Java or Python and deploy them to a Pulsar cluster without needing to run a separate stream processing engine. Pulsar Functions are lightweight compute processes that:

  • Are executed each time a message is published on a specified input topic.
  • Apply user-supplied processing logic to each message.
  • Publish the results of the computation to one or more topics.
Figure 4: The Pulsar Functions Programming Model
Figure 4: The Pulsar Functions Programming Model

Pulsar Functions can be written in Java and Python. In both Java and Python you have two options for writing a Function:

  • Using the language-native interface, which requires NO Pulsar-specific libraries or special dependencies. For example, in order to implement a Pulsar Function in Java you simply need to write a class that implements the java.util.Function interface shown below:
      import java.util.Function;

      public class EchoFunction implements Function<String, String> {
         public String apply(String input) {
            // Logic Here
         }
      }
  • Using the Pulsar Functions SDK, which leverages Pulsar-specific libraries that provide a range of functionality not available in the “native” interfaces, such as state management capabilities provided by the org.apache.pulsar.functions.api.Context object.
      import org.apache.pulsar.functions.api.Context;
      import org.apache.pulsar.functions.api.Function;

      public interface Function<I, O> {
          O process(I input, Context context) throws Exception;
      }

As you can see, the language-native approach provides a clean, API-free way of writing Pulsar Functions, and is ideal for the development of stateless event processors. However, the trade-off for this simplicity is the lack of access to any previous state information.

Deploying Apache Pulsar Functions

After you have compiled and tested your Pulsar Functions, you will need to deploy them to a Pulsar cluster. Pulsar Functions were built to support multiple deployment scenarios. Currently there are two ways to run Pulsar Functions:

  • Local Run Mode: If you run a Pulsar Function in this mode, it will run on the machine from which the command was issued, (e.g. your laptop, or an edge node). Here’s an example local run command:
      $ bin/pulsar-admin functions localrun \
         —py myfunc.py \
         —className myfunc.SomeFunction
         —inputs input-topic-1
         —outputs output-topic-1
  • Cluster Mode: When you run a Pulsar Function in cluster mode, the function code will be uploaded to a broker in a Pulsar cluster and run alongside the broker rather than in your local environment. You can run a function in cluster mode using the create command as shown below, executed on the Pulsar broker node.
      $ bin/pulsar-admin functions create \
         —jar target/my-functions.jar \
         —className org.example.functions.MyFunction \
         —inputs input-topic-1 \
         —outputs output-topic-1 \
         —parallelism 4 \
         —cpu 2 \
         —ram 8589934592 \
         —disk 10737418240

The command above will lunch 4 instances of the org.example.functions.MyFunction class on the Pulsar Broker, each with 2 CPU cores, 8GB of RAM, and 10GB of disk space. (Note that the RAM and disk settings are specified in bytes and the CPU and Disk settings are only enforced in Docker environments.)

Lastly, there is also a way to provide user configuration properties when creating a Pulsar Function, which is useful when you need to reuse a function. We pass in a collection of key-value pairs in the command below by specifying a JSON string for the —userConfig property. These values will be accessible at runtime via the Context object for Pulsar Functions that utilize the Pulsar Functions SDK, which we will examine in the next section.

$ bin/pulsar-admin functions create \
   —jar target/my-functions.jar \
   —className org.example.functions.MyFunction \
   —inputs input-topic-1 \
   —outputs output-topic-1 \
   —parallelism 4 \
   —cpu 2 \
   —ram 8589934592 \
   —disk 10737418240 \
   —userConfig ‘{“key-1”: “value-1”, “key-2”, “value-2”}

Best Practices for Using the Apache Pulsar Functions SDK

The additional Context object defined in both the Java and Python SDKs provides a wide variety of information and functionality to the function, including the ability to persist intermediate results that can be used to provide stateful event processing. The following is a sample of the information contained within the context object:

  • The name and ID of the Pulsar Function
  • The message ID of each message. Each Pulsar message is automatically assigned an ID.
  • The name of the topic on which the message was sent
  • The names of all input topics as well as the output topic associated with the function
  • The name of the class used for SerDe
  • The tenant and namespace associated with the function
  • The ID of the Pulsar Functions instance running the function
  • The version of the function
  • The logger object used by the function, which can be used to create function log messages
  • Access to arbitrary user config values supplied via the CLI
  • An interface for recording metrics

In this section we will cover on a few usage patterns that leverage these features of the Context object.

Best Practice 1: Dynamic Configuration

When you run or update Pulsar Functions created using the SDK, you can pass arbitrary key/values to them via the command line with the –userConfig flag. Key/values must be specified as JSON. Here’s an example of a function creation command that passes a user config key/value to a function:

$ bin/pulsar-admin functions create \
   —name word-filter \
   —userConfig ‘{“filter”, “$.Sensors{?(@.Type==‘Temp’)]}\
    # Other function configs

This feature allows us to write generic functions that can be used multiple times, but with a slightly different configuration. For instance, let’s say you want to write a function that filters JSON events based on a JSON path expression. When an event arrives, the contents are compared to the configured expression, and those entries that don’t match are filtered out.

Obviously the behavior of this function depends entirely upon the JSON path expression it is filtering on. In order to make it more reusable, with the Pulsar SDK we can defer specifying this value until it is deployed.

As we can see in the above example, the value of the JSON path filter to use isn’t known at compile time, and is instead pulled from the context using the getUserConfigValueOrDefault method.

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.jayway.jsonpath.JsonPath;

public JsonPathFilterFunction implements Function<String, String> {

   String process(String input, Context context) throws Exception {
       // Get the filter from the context
       String filter = context.getUserConfigValueOrDefault(filter, $)
                         .toString();
       Object filtered = JsonPath.read(input, filter);
       Return filtered.toString();
   }
}

Best Practice 2: Stateful Event Processors

Stateful event processors require a memory of prior events in order to generate their output. The ability to store state is a key building block for processing multiple events. Within the Apache Pulsar Function framework this state information is stored in a dedicated key-value store built on top of Apache BookKeeper. The Pulsar SDK provides access to the state information via the Context object.

Figure 5: Apache Pulsar State Management
Figure 5: Apache Pulsar State Management

Let’s look at a simple example to illustrate the idea of a stateful agent. Suppose we have an application that receives temperature reading events from an IoT sensor, and we are interested in knowing the average temperature of the sensor. We can use an event processing agent to continuously recompute this value using the following function.

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public AvgTempFunction implements Function<Float, Float> {

   Float process(Float currentTemp, Context context) {
       // Increment and get counter
       context.incrCounter(num-measurements);
       Integer n = context.getCounter(num-measurements);

       // Calculate new average based on old average and count
       Float old_average = context.getState(avg-temp);
       Float new_average = (old_average * (n-1) + currentTemp) / n;
       context.putState(avg-temp, new_average);
       return new_average;
   }
}

Best Practice 3: Void Functions

Pulsar Functions can publish results to one or more output topics, but this isn’t required. It is also possible to have functions that simply produce a log, write results to an external database, or just monitor the stream for anomalies. The following is a function that simply logs each event it receives.

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;

public LogFunction implements Function<String, Void> {

    Void process(String input, Context context) throws Exception {
        Logger LOG = context.getLogger();
        LOG.info("Received {}, input);
        return null;
    }
}

When using Java functions in which the return output type is Void, the function must always return null. For functions that don’t have an output type of Void, you can return null when you don’t want to produce an output event, e.g. when you are applying a filter and don’t want an event to be processed.

Best Practice 4: Processing Events from Multiple Input Topics

As we saw in Figure 4, Pulsar Functions can consume events from multiple topics, so let’s take a look at how one goes about writing a function that can do this:

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public MultiTopicFunction implements Function<String, String> {

    String process(String input, Context context) throws Exception {
        String sourceTopic = context.getSourceTopic();
        if (sourceTopic.equals(TopicA) {
           // parse as TopicA Object
        } else if (sourceTopic.equals(TopicB) {
           // parse as Topic B Object
        } else if (sourceTopic.equals(TopicC) {
           // parse as Topic C Object
        }
        .
    }
}

As you can see from the code example, we first get the name of the input topic from the Context object itself, and then based upon that information parse/handle the event accordingly.

Best Practice 5: Metrics Collection

The Apache Pulsar SDK provides a metrics collection mechanism that you can use to record any user-defined metrics you choose. In the following example, we keep separate metrics to track the total number of times the function was called, and another to track the number of times it was called with invalid input. For instructions on reading and using metrics, see the Monitoring guide.

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public MetricFunction implements Function<Integer, Void> {

    Void process(String input, Context context) throws Exception {
        context.recordMetric(invocation count, 1);
        if (input < 0) {
           context.recordMetric(Invalid data, 1);
        }
        return null;
    }
}

Conclusion

In this blog post, we took a deep dive into the simple event processing capabilities provided by the Streamlio platform based on Apache Pulsar Functions.

We introduced the concept of event based programming, and examined how many individual functions can be interconnected to form event processing networks. Lastly, we covered the Apache Pulsar Functions SDK and some of the best practices for utilizing the state management features provided by the Functions SDK.