Introduction to Heron

August 8, 2017

Karthik Ramasamy

Twitter open sourced Heron, their next-generation streaming engine, in 2016. Heron has been proven in production at Twitter for over three years and is powering several real-time workflows and analytics use cases. Heron was developed due to the need for a system can handle petabytes of data, improve developer productivity, simplify debugging, provide better efficiency and be easier to operate in a multi-tenant cluster environment. In this blog, we will provide an introduction to Heron, its terminology, and its core concepts.

Heron’s data model

A streaming application written in Heron is called a topology. A topology is a directed acyclic graph (DAG) whose nodes represent data-computing elements and whose edges represent the streams of data flowing between those elements. There are two different types of nodes: spouts connect to a data source and inject the data into a stream and bolts both process incoming data and emit data (perhaps to be further processed by other bolts). Bolts and spouts are the core components of a topology.

An example topology is shown in Figure 1. In this topology, there are two spouts, spout 1 and spout 2. Spout 1 taps into a data source and produces two data streams, one of which is consumed by bolt 1 and the other is consumed by bolt 2. Similarly, spout 2 connects to its data source and produces a data stream that is consumed by bolt 3. The bolts bolt 1, bolt 2 and bolt 3 process their incoming data streams and each produce an output stream. Bolt 4 consumes the stream produced by bolt 1 while bolt 5 takes input two streams produced by bolt 2 and bolt 3.

Figure 1. An example Heron topology
Figure 1. An example Heron topology

To give a concrete example of a topology, let us consider a streaming topology that counts the distinct set of words on the live stream of tweets. This topology is shown in Figure 2. The topology consists of a tweet spout, a parse tweet bolt and a word count bolt. The tweet spout taps into a tweet source and injects the tweet stream. The tweet stream is consumed by the parse tweet bolt which breaks the tweet into a set of distinct words. These words are output as a word stream consumed by the word count bolt that counts the words using a map. The word count bolt will output several pairs of that represent the word and its count of occurrence periodically.

Figure 2. Topology for counting distinct words on a tweet stream
Figure 2. Topology for counting distinct words on a tweet stream

To give a concrete example of a topology, let us consider a streaming topology that counts the distinct set of words in a live stream of tweets. This topology is shown in Figure 2. The topology consists of:

  • a tweet spout
  • a parse tweet bolt
  • a word count bolt

The tweet spout taps into a tweet source and injects the tweet stream into the topology. The tweet stream is then consumed by the parse tweet bolt, which breaks the tweet into a set of distinct words. These words are output as a word stream consumed by the word count bolt that counts the words using a map. The word count bolt will periodically output several pairs of pairs that the represent the word and its count of occurrence.

Component parallelism

If you have taken a database internals course, a topology is very similar to a logical plan produced after parsing an SQL query. Sometimes, the term logical plan is used to refer to a topology. If you map each node of the topology to a process, will the topology run? Yes, it will. However, what happens in the case when the tweet source is producing a lot of data that cannot be handled by a single process of the tweet spout or in the worst case if that is the only process running in a machine consuming all the CPU as well? The same argument applies to the parse tweet bolt and word count bolt when they receive more data than they can handle in a single process or a single process running in a machine consuming all the CPU as well. We need a mechanism to express how many instances of the tweet spout need to be run in parallel. Similarly, the number of instances needed for the parse tweet bolt and the word count bolt.

Each node in the topology is associated with a numerical value that specifies the number of instances of it needed to run on CPUs in parallel. This is called parallelism. Typically, this is specified by the topology developer based on the incoming data rate, outgoing data rate and the degree of computation at every node. The topology shown in Figure 1 with parallelism associated with each node is shown in Figure 3. For example, bolt 1 has a parallelism of 2 while bolt 2 has a parallelism of 3.

Figure 3. Topology with parallelism specified for each node
Figure 3. Topology with parallelism specified for each node

Grouping

Once you specify parallelism, now the question is how the data is partitioned as it moves across the spout and bolt instances. For example, the data from an instance of spout 2 should be sent to which instance of bolt 3? Heron provides the notion of grouping that specifies how the data should be partitioned and which downstream instance it should be sent. Heron supports the following groupings:

Grouping type Description
Shuffle Sends the data randomly to any instance of the downstream bolt. For example, any instance of spout 2 can send it to any instance of bolt 3.
Fields Sends the data to the particular instance of the downstream bolt based on the values of a specified field. Essentially, compute a hash value that determines the instance of the downstream bolt. For example, the data from bolt 2 might hashed on a tuple attribute such as ‘word’. The resultant hash value will decide the instance of bolt 5 the data will be delivered to.
All Sends the data to all the instances of the downstream bolt. For example, the data from an instance of bolt 3 will send to all instances of bolt 5.
Global Sends the data to a single instance. For example, the data from all instances of bolt 1 will go to only one instance of bolt 4, usually the instance with the lowest id.
Direct Sends the data to the instance of choice. For example, an instance of bolt 3 can decide which instance of bolt 5 should receive data.

Topology execution

The application developer expresses the topology DAG, component parallelism and the grouping information in the supported languages. In addition to these, the developer has the option to specify how many containers these instances to be run and also the resources for these containers in terms of CPU, memory and disk space. If no containers are specified, Heron will default to running all the instances in a single container. Once the topology is submitted, Heron will pack the instances into the containers. Once the packing is complete, Heron talks to the scheduler to schedule appropriate number of containers. An example of packing instances to 5 containers for the topology shown in Figure 1 is illustrated in Figure 4.

Figure 4. Example topology execution with 5 containers
Figure 4. Example topology execution with 5 containers

Putting it together: an example topology

Let us use an example topology to make our understanding concrete. The example topology is called a word count topology. The topology consists of a WordSpout that generates a random word stream and a ConsumerBolt that consumes this word stream for counting distinct words. The topology is visually illustrated in Figure 4 and the corresponding code for the topology is provided below.

package tutorial;

import backtype.storm.metric.api.GlobalMetrics;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.time.Duration;
import java.util.Map;
import java.util.Random;

/**
 * This is a Sample topology for storm where we consume a stream of words and executes an append operation to them.
 */
public final class ExclamationTopology {

    private ExclamationTopology() {}

    public static void main(String[] args) throws Exception {

        // Instantiate a topology builder to build the tag
        TopologyBuilder builder = new TopologyBuilder();


        // Define the parallelism hint for the topology
        final int parallelism = 2;

        // Build the topology to have a 'word' spout and 'exclaim' bolt
        // also, set the 'word' spout bolt to have two instances
        builder.setSpout("word", new TestWordSpout(Duration.ofMillis(50)), parallelism);

        // Specify that 'exclaim1' bolt should consume from 'word' spout using
        // Shuffle grouping running in four instances
        builder.setBolt("exclaim", new ExclamationBolt(), 2 * parallelism)
                .shuffleGrouping("word");

        // Create the config for the topology
        Config conf = new Config();

        // Set the run mode to be debug
        conf.setDebug(true);

        // Set the number of tuples to be in flight at any given time to be 10
        conf.setMaxSpoutPending(10);
        conf.setMessageTimeoutSecs(600);

        // Set JVM options to dump the heap when out of memory
        conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-XX:+HeapDumpOnOutOfMemoryError");

        // If the topology name is specified run in the cluster mode, otherwise run in
        // Simulator mode
        if (args != null && args.length > 0) {

            // Set the number of containers to be two
            conf.setNumWorkers(parallelism);

            // Submit the topology to be run in a cluster
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            System.out.println("Topology name not provided as an argument, running in simulator mode.");

            // Create the local cluster for simulation
            LocalCluster cluster = new LocalCluster();

            // Submit the topology to the simulated cluster
            cluster.submitTopology("test", conf, builder.createTopology());

            // Wait for it run 10 secs
            Utils.sleep(10000);

            // Kill and shutdown the topology after the elapsed time
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }


    /**
     * Word Spout that outputs a random word among a list of words continuously
     */
    static class TestWordSpout extends BaseRichSpout {

        private static final long serialVersionUID = -3217886193225455451L;
        private SpoutOutputCollector collector;
        private String[] words;
        private Random rand;
        private final Duration throttleDuration;

        // Instantiate with no throttle duration
        public TestWordSpout() {
            this(Duration.ZERO);
        }

        // Instantiate with specified throttle duration
        public TestWordSpout(Duration throttleDuration) {
            this.throttleDuration = throttleDuration;
        }

        @SuppressWarnings("rawtypes")
        public void open(
                Map conf,
                TopologyContext context,
                SpoutOutputCollector collector) {
            this.collector = collector;
            this.words = new String[]{"nathan", "mike", "jackson", "golda", "bertels"};
            this.rand = new Random();
        }

        // This method is called to generate the next sequence for the spout
        public void nextTuple() {
            final String word = words[rand.nextInt(words.length)]; // Choose a random word
            this.collector.emit(new Values(word)); // Emit it to go to the next phase  in topology
            if (!this.throttleDuration.isZero()) {
                Utils.sleep(this.throttleDuration.toMillis()); // sleep to throttle back cpu usage
            }
        }

        // This method speisifes the output field labels
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word")); // Here we mark the output tuple with the tag "word"
        }

    }

    /**
     * ExclamationBolt Bolt that takes a string word as outputs the same word with exclamation marks appended
     */
    static class ExclamationBolt extends BaseRichBolt {

        private static final long serialVersionUID = 1184860508880121352L;
        private long nItems;
        private long startTime;
        private OutputCollector collector;

        @Override
        @SuppressWarnings("rawtypes")
        public void prepare(Map conf,
                            TopologyContext context,
                            OutputCollector collector) {
            this.nItems = 0;
            this.collector = collector;
            this.startTime = System.currentTimeMillis();
        }

        @Override
        public void execute(Tuple tuple) {
            // We execute on only every 100000 instance of reading from the "word" spout for this example
            if (++nItems % 100000 == 0) {
                long latency = System.currentTimeMillis() - startTime;
                // Generate the appended word , emit it as output and log it in the console
                final String appendedWord = tuple.getString(0) + "!!!";
                this.collector.emit(new Values(appendedWord));
                System.out.println(appendedWord);
                // Print out and record basic latency measurements in the console and metric registry
                System.out.println("Bolt processed " + nItems + " tuples in " + latency + " ms");
                GlobalMetrics.incr("selected_items");
            }
        }

        // This method specifies the output field labels
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // Here we mark the output tuple with the tag "modified_word"
            declarer.declare(new Fields("modified_word"));
        }
    }
}

Conclusion

To conclude, this blog post provided a gentle introduction to some Heron terminology and concepts. We delved into how data is partitioned using parallelism and how it is moved between spouts and bolts using grouping. Next, we delved into the underlying topology execution and finally walking through an example topology how everything it fits together. In the next blog post, we will examine in detail how the data flows in the topology during execution.