Introducing Heron Streamlets

A new higher-level API for creating Heron topologies.

Jerry Peng and Luc Perkins

December 12, 2017

The streaming data space is changing rapidly before our eyes. The demands placed on stream processing systems are becoming both more intensive as organizations begin to realize the value of real-time and more diverse as new streaming use cases emerge. Heron, the best-of-breed stream processing engine open source by Twitter in 2016, has continuously risen to the many challenges presented by the real-time space.

Now, Heron is making a big push to make creating Heron topologies more easier and more developer friendly than ever before by introducing an API that centers around a new concept: Streamlets. Streamlets enable developers to write Heron topologies using a high-level interface inspired by concepts from functional programming.

Heron already offers a lower-level API called the Topology API for creating processing topologies. The Topology API is very powerful and still fully supported in Heron, but the Streamlet API will provide developers with much more flexibility when using Heron and will make some topologies easier to create than ever before.

The Streamlet API is currently available for Java but support for other languages will be added soon. This blog post will only talk about the Java API.

What are Streamlets?

Streamlets are typed streams of data that can be easily manipulated (transformed into new Streamlets, unified with other Streamlets, and so on). Figure 1 below shows an example Streamlet:

Figure 1. An example integer Streamlet
Figure 1. An example integer Streamlet

In this example, a randomInt function produces a source Streamlet (consisting of random integers between 1 and 100) that supplies data to a Heron processing graph. That initial Streamlet then undergoes a series of transformations via a series of operators:

  • First, a filter operator removes all integers in the Streamlet that are less than or equal to 30
  • A map operation adds 15 to each integer, producing a final result Streamlet.

The resulting, transformed Streamlet can now be sent to a sink, which is where the processing graph terminates. A sink can be any number of things. A sink can write results to:

Figure 2 below shows two source-operator-sink examples, one generic, one more specific:

Figure 2. Sources, operators, and sinks in a processing graph
Figure 2. Sources, operators, and sinks in a processing graph

Java example

Below is a complete implementation of the Streamlet processing graph described in the section above using the Streamlet API for Java:

import com.twitter.heron.streamlet.Builder;
import com.twitter.heron.streamlet.Config;
import com.twitter.heron.streamlet.Runner;
import com.twitter.heron.streamlet.Streamlet;

Builder processingGraphBuilder = Builder.createBuilder();
Streamlet<Integer> sourceStreamlet = processingGraphBuilder
        .newSource(() -> randomInt(1, 100));
Streamlet<Integer> filteredStreamlet = sourceStreamlet
        .filter(i -> i > 30);
Streamlet<Integer> mappedStreamlet = filteredStreamlet
        .map(i -> i + 15);
mappedStreamlet.log();

Config topologyConfig = Config.defaultConfig();

new Runner().run("integer-processing-example", topologyConfig, processingGraphBuilder);

In the Java example above, each Streamlet transformation resulted in a new, explicitly named Streamlet object. But you can also chain together Streamlet operations in a more fluid fashion:

Builder processingGraphBuilder = Builder.createBuilder();

processingGraphBuilder
        .newSource(() -> randomInt(1, 100))
        .filter(i -> i > 30)
        .map(i -> i + 15)
        .log();

Streamlet operations

We won’t cover all of the many operations that are available for Streamlets in this blog post, but we’ll mention a few here:

Operation Description
map Returns a new Streamlet by applying a user-supplied mapping function to each element
filter Returns a new Streamlet by applying a user-supplied filtering function to each element
union Combines two Streamlets of the same type (for example two integer Streamlets) into a single Streamlet
join Joins two Streamlets into a single Streamlet on key, within a time window, and in accordance with a user-supplied join function
toSink Terminates the processing graph by doing something with the results, for example writing to a pub-sub messaging topic or saving to a database
log Logs the results of a processing graph to stdout (log operations are a special case of sinks)

You can see a full list of Streamlet operations in the Heron documentation.

Partitioned processing

One major advantage of Heron in general is that enables you to easily parallelize your stream processing logic by distributing operations across a user-specifiable number of containers (Heron currently uses cgroups). Both the Topology API and the new Streamlet API allow for this, but the Streamlet API makes it a little bit easier. Here’s a Java example:

processingGraphBuilder
        .newSource(() -> randomInt(1, 100))
        .setNumPartitions(2)
        .filter(i -> i > 30)
        .setNumPartitions(2)
        .map(i -> i + 15)
        .setNumPartitions(3)
        .log()
        .setNumPartitions(1);

Notice how each “step” in the graph has a number of partitions assigned to it. Compare the two topologies in Figure 3 below. In Topology 1 on the left, each operation has a parallelism of one (which is the default), while Topology 2 represents the topology created in the code example directly above.

Figure 3. From processing graph to topology
Figure 3. From processing graph to topology

With the Streamlet API, modifying the resulting spouts-and-bolts topology requires just a simple setter that can be applied to each operation. Heron then automatically performs the work of stitching the spouts and bolts together and ensuring that data is properly passed between them.

Topology API example

For the sake of comparison, let’s have a look at the scaffolding for a topology created using the Topology API. This topology contains just one spout and one bolt:

public final class TestTopology {
    public static class TestSpout extends BaseRichSpout {
        @Override
        public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
            // Specify what happens when the spout
        }

        @Override
        public void nextTuple() {
            // Specify how each incoming tuple is processed
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // Specify output fields for the spout
        }
    }

    public static class TestBolt extends BaseBasicBolt  {
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            //
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // Specify output fields, if any, for the bolt
        }
    }


    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("test-spout", new TestSpout(), 1);
        builder.setBolt("test-bolt", new TestBolt(), 1).shuffleGrouping("test-spout");
        Config conf = new Config();
        HeronSubmitter.submitTopology("test-topology", conf, builder.createTopology());
    }
}

As you can see, the Topology API does allow for a more fine-grained approach to writing topology logic, but at the cost of a lot of extra boilerplate. The choice is up to you. If you need the Topology API, it’s still 100% supported, and topologies created using both APIs can run side by side in the same Heron installation. But if you’re looking for concision and think about stream processing in a more functional and less procedural way, then the Streamlet API may offer big wins.

More examples

You can see a wide variety of Streamlet API examples for Java on GitHub.

We’ve also created a “starter pack” repository that you can use to experiment on your own. You can

$ git clone https://github.com/streamlio/heron-java-streamlet-api-example
$ cd heron-java-streamlet-api-example
# Build the existing example topology
$ mvn assembly:assembly

To create your own new topology:

$ touch src/main/java/io/streaml/heron/streamlet/MyStreamletTopology.java

The Streamlet API: evolving beyond the Storm model

The Storm spouts-and-bolts model for creating topologies is a powerful one that has ably served the needs of massive organizations (and not just Twitter!) for many years. We suspect that it will remain in use for many years into the future.

But we’re also very excited to see the emergence of the Streamlet API because we feel the ground shifting around us. With functional programming becoming ever more well established, the Streamlet API feels more in tune with the current Zeitgeist in software development. It’s lighter weight, more intuitive, and carries less cognitive overhead than the Topology API. And, frankly, we think it’s just more fun this way. When we began experimenting with the new API, our imaginations lit up in a way that we’d never experienced with any other API for streaming data. If spouts and bolts work for you, stick with it. But we think you’d be greatly remiss in not giving the Streamlet API a spin.

Keep in touch

If you’re interested in Heron, you may want to participate in:

And for more information about the Heron project in general, go to heronstreaming.io and follow us on Twitter @heronstreaming.