Heron going exactly-once

May 5, 2017

Sanjeev Kulkarni

In May 2016, Twitter open sourced Heron, their next-generation streaming engine. Heron has been proven in production at Twitter for over three years and has powering real-time processes, workflows, and analytics at massive scale for several years there. Heron was developed to replace Storm, providing a system that scales better, improves developer productivity, is easier to debug, provides better efficiency and is easier to operate in a multi-tenant cluster environment.

Most streaming systems provide some set of guarantees for the data they process. These guarantees fall into the three general categories:

Category Description
At-most-once The system processes the data using a best-effort strategy. It’s possible that some of the data injected into the system may be lost due to some combination of processing, machine, and network failures.
At-least-once The data injected into the system is guaranteed to be processed at least once. It’s possible, however, that the data is processed more than once in the presence of failures, retries, or other contingencies.
Exactly-once The streaming system ensures that the data it receives is processed exactly once—even in the presence of various failures—leading to accurate results.

Most production Heron streaming applications were centered around at-most-once and at-least-once. As the adoption of Heron expands across enterprises and industries, there is a need to support exactly-once. Streamlio collaborated with Twitter and Microsoft to develop and incorporate exactly-once into Heron. In this blog post, we outline some of the requirements and provide previews of this upcoming feature.

Heron overview

To better understand the support for exactly-once, we provide an overview of Heron concepts and data model. A streaming job in Heron is called a Topology. A topology is a directed acyclic graph whose nodes are the data computing elements and the edges represent streams of data flowing between those elements. There are two types of nodes: Spouts that connect to a data source and inject to a stream as tuples and Bolts that process the incoming data tuple and emit outgoing tuples. A Heron topology can be loosely thought of as a logical plan. However, in order to run on physical machines, the developer needs to indicate the number of instances of each spout/bolt. These instances are packed into several containers and these containers are run on a cluster of machines. An example Heron Topology and its physical plan is shown in Figures 1 and 2.

Figure 1: An example Heron topology
Figure 1: An example Heron topology

Based on the anticipated use cases, we identified the following initial requirements for supporting exactly-once:

  • Addition of exactly-once should not impact the performance and functionality of at-most-once and at-least-once topologies in production.
  • Latency overhead for exactly-once topologies should be negligible when compared to running them without exactly-once guarantees.
  • Writing exactly-once topologies should be easy for topology developers. Heron should provide the required minimal low-level APIs to achieve stateful processing and exactly-once, and these APIs can be leveraged for higher-level APIs, for instance built-in stateful window support.
  • State store to support exactly-once should provide an abstraction for supporting different types of storage.
Figure 2: Physical plan for the example Heron topology
Figure 2: Physical plan for the example Heron topology

State and stateful components

To support exactly-once, each spout/bolt should have the notion of State. State represents the results of computation accumulated over a period of time. For example, the state in a bolt might refer to the count of the number of retweets of several tweets. State is defined as an interface to the key value map that extends the java map.

/**
* State represents the interface as seen by stateful bolts and
* spouts. In Heron, state gives a notional Key/Value interface
* along with the ability to iterate over the key/values
*/
public interface State <K extends Serializable,
                        V extends Serializable> extends Map<K, V> {
}

For adding state to a spout or a bolt, it needs to derive from an interface called IStatefulComponent for saving and retrieving state.

/**
* State represents the interface as seen by stateful bolts and
* spouts. In Heron, state gives a notional Key/Value interface
* along with the ability to iterate over the key/values
*/
public interface State <K extends Serializable,
                        V extends Serializable> extends Map<K, V> {
}


/**
 * Defines the interface for a component to save its internal state
 * in the state interface
 */
public interface IStatefulComponent<K extends Serializable,
                                    V extends Serializable>
                                                     extends IComponent {

    /**
    * Initializes the state of the spout/bolt to that of a previous
    * checkpoint. This method is invoked when a component is executed
    * as part of a recovery run. In case, there was prior state
    * associated with the component, the state will be empty.
    *
    * Stateful Spouts/Bolts are expected to hold on to the state
    * variable to save their internal state
    *
    * Note that initState(State<K, V> state) is called before spout open() and bolt
    * prepare().
    *
    * @param state the previously saved state of the component.
    */
    void initState(State<K, V> state);

    /**
    * This is a hook for the component to perform some actions just
    * before the framework saves its state.
    *
    * @param checkpointId the ID of the checkpoint
    */
    void preSave(String checkpointId);
}

State storage

State Storage provides the abstract interface for storing and retrieving state. This abstract interface can be implemented for different types of storage systems.

public interface IStatefulStorage {
    /**
    * Initialize the stateful storage with the given config
    */
    void init(Map<String, Object> conf);

    /**
    * Cleanup the state storage backend
    */
    void close();

    // store the checkpoint
    boolean store(Checkpoint checkpoint);

    // retrieve the checkpoint
    boolean restore(Checkpoint checkpoint);

    // dispose the checkpoint
    boolean dispose(String topologyName, String oldestCheckpointId,
                    boolean deleteAll);
}

Stateful topology

Now that we have defined the notion of state and the methods each component needs to implement for storing and recovering state, let us look at how a simple stateful word count topology can be written. In the topology, the TestWordSpout generates words from a list of words and pass it on to the ConsumerBolt, which counts the words.

The state in TestWordSpout is essentially the offset position in the word array. When the spout fails, it can start from the last known state when it is check pointed.

public static class TestWordSpout extends BaseRichSpout
                            implements IStatefulComponent<String, Integer> {

    private SpoutOutputCollector collector;
    private String[] words;

    // accumulate the state
    private Map<String, Integer> posMap;

    // current state of the word count
    private State posState;

    public void open(Map<String, Object> conf, TopologyContext context,
                                            SpoutOutputCollector acollector) {
      collector = acollector;
      words = new String[]{"nathan", "mike", "jackson", "golda", "bertels"};
      posMap = new HashMap<String, Integer>();

      for (Map.Entry<Serializable, Serializable> entry: posState.entrySet()) {
        if (entry.getKey() instanceof String && entry.getValue() instanceof Integer) {
          posMap.put((String)entry.getKey(), (Integer)entry.getValue());
        }
      }
    }

    public void nextTuple() {
        int offset = posMap.getOrDefault("current", 0);
        final String word = words[offset];
        posMap.put("current", ++offset % words.length);
        collector.emit(new Values(word));
    }

    // initialize the state when the spout recovers from failure
    @Override
    public void initState(State state) {
        posState = state;
    }

    // copy the state before saving into persistent store
    @Override
    public void preSave(String checkpointId) {
        for (Map.Entry<String, Integer> entry : posMap.entrySet()) {
            posState.put(entry.getKey(), entry.getValue());
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

In the ConsumerBolt, the state is count of the number of words that has been accumulated so far. This state is saved and retrieved whenever a failure occurs. The bolt variable countMap accumulates the count of words and it transfers the state to countState before it is saved. Similarly, the state retrieved first populates countState before it is transferred to countMap during the call to prepare.

public static class ConsumerBolt extends BaseRichBolt
                            implements IStatefulComponent<String, Integer> {

    private OutputCollector collector;

    // accumulate the state
    private Map<String, Integer> countMap;

    // current state of the word count
    private State countState;

    public void prepare(Map map, TopologyContext topologyContext,
                        OutputCollector outputCollector) {
        collector = outputCollector;
        countMap = new HashMap<String, Integer>();

        for(Map.Entry<Serializable, Serializable> entry:countState.entrySet()) {
            if (entry.getKey() instanceof String && entry.getValue() instanceof Integer) {
                countMap.put((String)entry.getKey(), (Integer)entry.getValue());
            }
        }
    }

    @Override
    public void execute(Tuple tuple) {
        String key = tuple.getString(0);
        int val = countMap.getOrDefault(key, 0);
        countMap.put(key, ++val);
    }

    // initialize the state when the bolt recovers from failure
    @Override
    public void initState(State state) {
        countState = state;
    }

    // copy the state before before saving into persistent store
    @Override
    public void preSave(String checkpointId) {
        for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
            countState.put(entry.getKey(), entry.getValue());
        }
    }
}

Finally, the topology is assembled using TestWordSpout and ConsumerBolt. It also sets the desired amount of resources and, finally, sets the flag TOPOLOGY_STATEFUL flag to true, indicating that it is a stateful topology.

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), parallelism);
builder.setBolt("consumer", new ConsumerBolt(), parallelism)
        .fieldsGrouping("word", new Fields("word"));

Config conf = new Config();
conf.setNumStmgrs(parallelism);


// Set the config for resources here
conf.setComponentRam("word", ByteAmount.fromGigabytes(2));
conf.setComponentRam("consumer", ByteAmount.fromGigabytes(3));
conf.setContainerCpuRequested(6);

// For stateful processing
conf.put(Config.TOPOLOGY_STATEFUL, true);
conf.put(Config.TOPOLOGY_STATEFUL_CHECKPOINT_INTERVAL, 30);

HeronSubmitter.submitTopology(args[0], conf, builder.createTopology());

With the inclusion of support for exactly-once, Heron provides at-most-once, at-least-once, and exactly-once guarantees seamlessly in a single streaming engine. This provides the flexibility for developers to choose the appropriate guarantees depending on the application. We’re looking forward to releasing exactly-once to open source soon for wider consumption, and we welcome collaborators and contributors joining the growing Heron community.

References

  1. Twitter Heron: Towards Extensible Streaming Engines, IEEE International Conference on Data Engineering, April 2017.
  2. Streaming@Twitter, IEEE Data Engineering Bulletin, Special Issue on Next-Generation Stream Processing, Edited by David Maier and Badrish Chandramouli, Dec 2015.
  3. Open Sourcing Twitter Heron, Twitter Engineering Blog, May 25, 2016.
  4. Flying faster with Twitter Heron, Twitter Engineering Blog, June 2015.
  5. Twitter Heron: Streaming at Scale, Proceedings of ACM SIGMOD Conference, Australia, June 2015.