Event Processing Design Patterns with Pulsar Functions

October 25, 2018

David Kjerrumgaard

In our previous post we laid the groundwork for an event-based programming model based on Apache Pulsar Functions. In this post we’ll consider some common real-time streaming patterns and how to implement them.

Pattern 1: Dynamic Routing

Let’s review how we would go about implementing content-based routing using Apache Pulsar Functions. Content-based routing is an integration pattern that has been around for years and is commonly used in event hubs and messaging frameworks. The basic idea is that the content of each message is inspected and then is routed to various destinations based upon value(s) found or not found in the content.

Figure 1: Content-based routing function
Figure 1: Content-based routing function

The following example uses the Apache Pulsar SDK, which allows the user to configure three different values:

  • The regular expression used to decide where route the messages
  • The topic name for the messages that match the regex
  • The topic name for those that don’t.

While this example isn’t particularly exciting, it does demonstrate the fact that Pulsar Functions can dynamically decide where to send events based on the function logic, which is a very powerful tool.

  import java.util.regex.*;
  import org.apache.pulsar.functions.api.Context;
  import org.apache.pulsar.functions.api.Function;

  public ContentBasedRoutingFunction implements Function<String, String> {

     String process(String input, Context context) throws Exception {
         String regex = context
             .getUserConfigValue(regex).toString();
         String matchedTopic = context
             .getUserConfigValue(matched-topic).toString();
         String unmatchedTopic = context
             .getUserConfigValue(unmatched-topic).toString();

         Pattern p = Pattern.compile(regex);
         Matcher m = p.matcher(input);
         if (m.matches()) {
           context.publish(matchedTopic, input);
         } else {
           context.publish(unmatchedTopic, input);
         }
     }
  }

Pattern 2: Filtering

The filtering pattern should be applied when you want to exclude the majority of events on a topic by retaining only those events that meet a given criteria. This can be particularly useful if you are looking only for events of interest, e.g. credit card purchases over a certain dollar amount, ERROR messages in a collection of log files, or a sensor reading that exceeds a certain threshold. (See pattern 4)

Let’s consider a use case in which you are monitoring an event stream of credit card transactions and attempting to detect any fraudulent or suspicious activity. Given the sheer volume of transactions and a limited window of time in which to make an “approve” / “disapprove” decision, you must first filter out transactions that have ‘risky’ characteristics such as cash advances, large dollar purchases, etc.

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.company.creditcard.Purchase;

public class FraudFilter implements Function<Purchase, Purchase> {

    Purchase process(Purchase p, Context context) throws Exception {
         if (p.getTransactionType() == CASH ADVANCE) ||
             p.getAmount > 500.00) {
            return p;
         }
        return null;
    }
}

One approach to this would be to implement a filter that identified these characteristics and routed only those transactions to a separate topic for further evaluation. Any credit card purchase that passed this filter could be routed to a “Potential Fraud” topic for further evaluation, and all others would be filtered out and ignored.

Figure 2: Filtering function example
Figure 2: Filtering function example

In Figure 2 above, we show the behavior of the FraudFilter function based upon three separate purchase objects. The first purchase meets the given criteria, and is passed on the to “Possible Fraud” topic for further evaluation, while the second and third purchases don’t meet the fraud criteria and so are filtered out (not passed to the “possible fraud” filter).

Pattern 3: Transformation

This category of patterns is used to convert events from one type to another or to add, remove, or modify the values of the input event.

Projection

This pattern is similar to the project operator in relational algebra; it selects a subset of the attributes of the input event and creates an output event containing only those attributes. It can be used to remove sensitive fields from an event or to reduce the event size down to the bare minimum attributes required. Figure 3 below shows one application of this pattern in which the incoming social security numbers are ‘masked’ before publishing the records to the down-stream topic.

Figure 3: Sensitive Field Function
Figure 3: Sensitive Field Function

Enrichment

This pattern is used to add data to the output event that was not present in the input attributes. Typical enrichments involve some type of lookup into reference data based on some key value within the input event. The following example shows how a geographic location can be added to the output event based upon an IP address contained within the input event.

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.company.creditcard.Purchase;
import com.company.services.GeoService;

public class IPLookup implements Function<Purchase, Purchase> {
    Purchase process(Purchase p) throws Exception {
      Geo g = GeoService.getByIp(p.getIPAddress());
      // By default, these fields are blank, so we just modify the object
      p.setLongitude(g.getLon());
      p.setLatitiude(g.getLat());
      return p;
    }
}

Split

In this pattern, the event processor takes a single input event and breaks it into several output events. This pattern is useful when the input event is a batch containing multiple individual events, such as entries in a log file, that you want to process individually. This is illustrated in the example below, which splits the input by newline characters and publishes each line to the configured output topic.

Figure 4: Splitter function example
Figure 4: Splitter function example

The implementation of such a function would be as follows:

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public class Splitter implements Function<String, String> {

    String process(String s, Context context) throws Exception {
       Arrays.asLists(s.split(“\\R).forEach(line ->
            context.publish(context.getOutputTopic(), line));
       return null;
    }
}

Pattern 4: Alerts and Thresholds

This pattern detects a condition and generates alerts based on a condition (e.g. alert on high temperature). These alerts can be based on a simple value or more complex conditions such as rate of increase, sustained shift in level, etc.

In the following example, based on user-configured parameters for the threshold value, e.g. 100.00, 38.7, etc. and the email address to receive the alert notification. When the function receives a sensor event that exceeds the configured threshold, an email is sent.

import javax.mail.*;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public SimpleAlertFunction implements Function<Sensor, Void> {
   Void process(Sensor sensor, Context context) throws Exception {
       Double threshold = context
           .getUserConfigValue(threshold).toString();
       String alertEmail = context
           .getUserConfigValue(alert-email).toString();

      if (sensor.getReading() >= threshold) {
         Session s = Session.getDefaultInstance();
         MimeMessage msg = new MineMessage(s);
         msg.setText(Alert for Sensor: + sensor.getId());
         Transport.send(msg);
      }
      return null;
  }
}

Below is an example of a stateful function that generates an alert based on the rate of increase in a specific sensor reading. Access to previous sensor readings is required in order to make the decision whether to generate an alert or not.

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public ComplexAlertFunction implements Function<Sensor, Void> {

  Void process(Sensor sensor, Context context) throws Exception {
      Double threshold = context
           .getUserConfigValue(threshold).toString();
      String alertTopic = context
           .getUserConfigValue(alert-topic).toString();

      // Get previous & current metric values
      Float previous = context.getState(sensor.getId() + -metric);
      Long previous_time = context.getState(sensor.getId() + -metric-time);
      Float current = sensor.getMetric();
      Long current_time = sensor.getMetricTime();

      // Calculate Rate of change & compare to threshold.
      Double rateOfChange = (current-previous) /
                           (current_time-previous_time);
      if (abs(rateOfChange) >= threshold) {
         // Publish the sensor ID to the alert topic for handling
         context.publish(alertTopic, sensor.getId());
      }

      // Update metric values
      context.putState(sensor.getId() + -metric, current);
      context.putState(sensor.getId() + -metric-time, current_time);
  }
}

Here we are using the Apache Pulsar Functions state management feature to keep just the previous metric reading and time and to prepend the sensor ID to these values (the sensor ID is needed because we will process metrics from multiple sensors). For simplicity’s sake, we are assuming that the events arrive in the proper order, i.e. always the most recent reading with no out-of-sequence readings.

It is also worth noting that this time we are forwarding the sensor IDs to a dedicated alert topic for further processing instead of just sending an email. This will allow us to perform additional enrichment processing (via Pulsar Functions) on the event such as performing a lookup to get the sensor’s geo-location, which we can then use to notify the proper personnel.

Figure 5: Complex alerting example
Figure 5: Complex alerting example

Pattern 5: Simple Counting and Counting with Windows

This category of patterns includes aggregate functions that take as input a collection of events and produces a single desired output event by applying a function over the input events. Examples of aggregation functions are sum, average, maximum, minimum, and percentiles.

Figure 6: Word count example
Figure 6: Word count example

The following implementation of the classic “word count” problem using Pulsar Functions is one such example. It calculates a sum of the occurrences of every individual word published to given topic.

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public WordCountFunction implements Function<String, Void> {

   Void process(String s, Context context) throws Exception {
      Arrays.asLists(s.split(“\\.).forEach(word -> context.incrCounter(word, 1));
      return null;
   }
}

Given the never-ending nature of streaming data sources, performing these aggregations indefinitely doesn’t make much since, and typically these calculations are performed over windows of data, (e.g. failure count last hour).

Figure 7: A window of data within an event stream
Figure 7: A window of data within an event stream

A window of data represents a finite subset of the event stream, as depicted in Figure 7. But how do we go about defining the boundaries of a data window? There are two common attributes used to define windows:

  • Trigger Policy: Controls when our function code is executed, or triggered. These are the rules that the Apache Pulsar Function framework uses to notify our code that it is time to process all of the data collected in the window.

  • Eviction Policy: Controls the amount of data retained in the window. These are the rules used to decide if a data element should be evicted from the window.

Both of these policies are driven by either time or the quantity of data in the window. Let’s explore the distinction between these two policies and how they work in concert with one another. While there are a variety of windowing techniques, the most prominent ones used in practice are tumbling and sliding windows.

Tumbling Windows

The eviction policy for tumbling windows is ALWAYS based on the window being full. Therefore you only need to specify the trigger policy you want to use, either count-based or time-based. Let’s explore the behavior of a count-based tumbling window to get a better understanding of how they work.

In the first example shown in Figure 8, the trigger policy is set to 2, which means that at the point in time at which two items are in the window, the trigger will fire and our Pulsar Function code will be executed, and the window will be cleared. This behavior is irrespective of time, whether it takes 5 seconds or 5 hours for the window count to reach two items doesn’t matter, only when the count reaches two matters.

Figure 8: Count-based tumbling window
Figure 8: Count-based tumbling window

Let’s contrast this with the behavior of a time-based tumbling window with a duration configured to be 10 seconds. When the 10 second time interval has elapsed, our function code is triggered regardless of how many events are in the window. Thus, the first window contained seven events, while the second one contained only three.

Figure 9: Time-based tumbling window
Figure 9: Time-based tumbling window

Sliding Windows

The sliding window technique defines a window length, which sets the eviction policy to limit the amount of data retained for processing, and a sliding interval the defines the trigger policy. Both of these policies can be defined in terms of time (duration) or length (number of data elements).

In Figure 10 below, the window length is configured to be 2 seconds meaning that any data older than 2 seconds will be evicted and not used in the calculation. The sliding interval is configured to be 1 second, which means that every second our Pulsar function code would be executed and we would be able to process the data within the entire window length.

Figure 10: Sliding window state at time = 2
Figure 10: Sliding window state at time = 2

In the previous example, both the eviction and trigger policies were defined in terms of time, however it is also possible to define one or both of these in terms of length instead.

Implementing either of these types of windowing functions in Pulsar Functions is straightforward and only requires that you specify a java.util.Collection as the input type, as shown below, along with specifying the appropriate window configuration property in the –userConfig flag when you create the function.

The window configuration parameters shown below enable you to implement all four variations of the time windows introduced earlier when used in the proper combinations. The proper combinations are listed in Table 1:

  • “–windowLengthCount”, The number of messages per window
  • “–windowLengthDurationMs”, The time duration of the window in milliseconds
  • “–slidingIntervalCount”, The number of messages after which the window slides
  • “–slidingIntervalDurationMs”, The time duration after which the window slides
Table 1: Pulsar window function settings matrix
Table 1: Pulsar window function settings matrix

Conclusion

In this blog post we presented several common stream processing pattern implementations using Apache Pulsar Functions. These processing patterns include content-based routing, filtering, transformations, alerts, and simple counting applications. We also covered basic windowing concepts and explored the windowing capabilities provided by Apache Pulsar Functions.