September 5, 2017
This is part 2 of a series of blog posts that highlights key features of Heron, a next-generation stream processing system developed at Twitter. In part 1 of the series, we discussed how Heron supports multi-tenancy, native support for containers, resource isolation at all levels, a flexible resource allocation model, and flexible deployment scenarios, and we showed how some of these features allow for greatly improved developer productivity. In this post, we’ll continue the discussion by showing how Heron achieves seamless support for different processing semantics, how its modular design makes it efficient, extensible, and scalable, and we’ll also shine light on its security model, multi-language API, and easy operability.
When we say “delivery semantics,” what we man is the guarantee that tuples—the core data type underlying Heron topologies—are delivered to Heron applications by the underlying streaming system. Different streaming applications require different types of processing guarantees. For some applications, a certain minimal amount of data loss might be tolerable, for example an application computing Twitter trends. A billing application, on the other hand, would require zero data loss. In order to accommodate different requirements, Heron supports three types of delivery semantics: at most once, at least once, and effectively once.
At-most-once semantics — In this mode, Heron provides best-effort delivery of tuples. If there are failures, such as hardware failures, some tuples might be lost and will not be retried. This tends to be best for applications for which a certain amount of data loss is acceptable.
At-least-once semantics — When running in this mode, Heron will guarantee that tuples will be processed by the application at least once. Heron keeps track of the tuples as they are produced by different topology components and ensures that they are all retired at the end of processing. If failures and/or timeout occur, Heron will retry those tuples.
Effectively-once semantics — This mode, commonly referred to as “exactly-once” by many other streaming platforms (please stay tuned for a follow-up blog post discussing the differences), guarantees that tuples that cause state changes are effectively processed once. To be more concrete, Heron guarantees that each individual tuple will contribute to any state change only once. Heron will ensure, via its distributed state checkpointing mechanism, that if a tuple fails to be processed, the state of all components in the application will be rolled back to a previous successful checkpoint. Heron will also start reading from incoming data streams at the correct offset that corresponds to the checkpoint that all the states in the application were rolled back to.
If you are interested in a deeper discussion about processing semantics, check out the “Heron Delivery Semantics” video series by Sanjeev Kulkarni, co-founder of Streamlio (part 1 and part 2).
A streaming system should be highly efficient. If the system is inefficient, it will use more resources to achieve the same throughput and efficiency as a more efficient system. Heron was designed with efficiency in mind right from the start for large data processing environments. Based on detailed performance evaluation in production settings, Heron can deliver throughput of 170 milliom tuples per second on 17 machines, with each machine consisting of 12 cores and 2 hyperthreads per core, resulting in 24 hyperthreads processing data.
Since the amount of data in the world is growing by an estimated 40% per year, enterprises need to have systems and solutions in place to accommodate this growth. Scaling a system requires fundamentally different thinking in system design and how to efficiently use resources. Heron was designed for high scalability along multiple dimensions:
Heron has demonstrated scalability in all three dimensions in production, running topologies consisting of 800-1000 containers each as well as topologies consisting of 50-60 components in each of the topology’s directed acyclic graphs (DAGs).
Heron topologies can be used for a highly diverse set of streaming applications. Hence, the SLA requirements for these applications can vary. A critical stock trading application, for example, might require a few milliseconds of latency, while an application that computes Twitter trends might require high throughput with a few seconds of latency. Heron provides a rich set of parameters that allows you to fine-tune performance in accordance with SLA requirements on a per-topology basis. Unlike other systems that depend on a single set of parameters for all processing jobs, Heron topologies can be tuned on a topology-by-topology basis, which ensures that changing the parameters for one topology won’t affect other topologies. Several topologies with different SLA-derived parameters can thus easily co-exist in the same cluster.
The “Big Data” ecosystem is highly complex, and continuous innovation has led to a profusion of infrastructure components. In this continuously evolving ecosystem, we wanted to ensure that Heron can adapt and still retain its well-tested, battle-hardened, productionized core. Heron was designed with extensibility in mind, and pretty much any component can be replaced with a different implementation. For example, a state manager that provides distributed synchronization provides multiple implementations, one for Apache Bookkeeper and another for local filesystems. Similarly, the checkpoint manager for storing snapshots provides implementations for Hadoop and again for local filesystems.
Let’s say that you want to use etcd as a state manager. You can add an etcd implementation using the well-defined state manager interface and the rest of the system remains unchanged. Similarly, if you want to use Ceph for storing checkpoints, you can add another checkpoint manager implementation and the system will start using the implementation if you apply the proper config setting.
Heron provides two tools for managing topologies: Heron Tracker and Heron UI. Heron Tracker provides REST API endpoints for querying the state of topologies and for fetching operational metrics. The REST API facilitates writing custom tools to provide visibility into the state of topologies. In fact, Heron UI uses the Heron Tracker REST API. Furthermore, each topology emits a wide variety of metrics using a process called the metrics manager. The metrics manager has several plugins for exporting metrics to third party systems. The currently supported plugins:
Heron topology metrics are continuously monitored to automatically identify issues and problems that might arise in running topologies. These issues and problems are bubbled up to the user so that timely action can be taken.
Heron is currently being used for a wide range of use cases across multiple organizations, such as Twitter, Google, Microsoft, and ndustrial.io. This broad set of use cases includes but is not limited to:
These use cases have evolved over a period of time and via feedback about Heron. Heron features have thus been continuously expanded to accommodate new use cases.
Heron has been running in production for more than three years at Twitter. Heron currently processes around 250 billion events per day in production and is being used by numerous internal services. It has processed more than 275 trillion events up to date in total and it runs on thousands of machines powering several critical applications at Twitter.
Heron is a next-generation streaming system designed to address several shortcomings in other open source streaming systems. In this two-part series, we’ve highlighted several features of Heron that make it more than ready for enterprise-wide deployment. In this post, we summarized additional capabilities such as multiple processing semantics, scalability, efficiency, support for mixed workloads, operational maturity, and how its modular design helps in quickly adding new functionality.
If you’re interested in Heron, you may want to participate in the community via:
And of course for more information about the Heron project in general, go to heron.io and follow us on Twitter @heronstreaming
We would like to acknowledge Viktor Klang for coining the term “effectively once.”
Why Heron? Part 1