Last year, Twitter open sourced their next-generation streaming engine, Heron. Heron has already attracted contributors from a variety institutions looking to greatly extend their stream processing capabilities for use cases ranging from finance to the Internet of Things (IoT). In order to meet the diverse requirements of these organizations, Heron needs to:
Heron is deployed at Twitter, for example, using Aurora running on top of Mesos. Data-driven organizations already manage a solutions stack based on YARN scheduler since it supports several batch processing systems such as Apache Hive and Apache Spark on top of Hadoop. Recently organizations have started adopting Kubernetes for stateless services and eventually planning to move stateful services as well. Running Heron side by side with these systems reduces the total cost of ownership and also improves maintainability and manageability. Similarly, streaming application developers want the flexibility to optimize their application for performance or for deployment cost when running in public clouds. And data scientists, along the same lines, may want to use Python for enhancing a machine learning model while a financial analyst might prefer using C++ for a high-performance trading application.
To address these challenges, we, the original creators of Heron, designed the system to be modular from the ground up. In Heron’s architecture, each aspect of the system provides the necessary minimum functionality needed to perform a particular operation. These various modules communicate with one another using well-defined communication protocols. As a consequence, Heron is highly extensible and thus allows the application developer, system administrator, or Heron contributor to createnew implementation of specific modules to and plug them into the system without affecting other modules or the communication mechanisms between them. The major benefit of this architecture is that different Heron streaming applications can operate seamlessly using the same resource with different module implementations. One application might be interested in, for example, minimizing the total cost while the other might prefer to manage resources for load balancing. This is critical for organizations with diverse workloads.
To the best of our knowledge, Heron is the first streaming system that adopts and implements a modular architecture. This architecture was inspired by microkernel-based operating systems, as shown in Figure 1. Microkernel systems, unlike monolithic kernel-based operating systems, provide a core set of services such as inter-process communication (IPC) and scheduling, amongst others. Other critical services, such as networking, file system,s and virtual memory management systems, run in user space, outside the kernel. In monolithic kernels, however, these modules were part of the kernel, which meant that a considerable amount of work was required to adopt the kernel for new environments and hardware.
Similarly, the Big Data platform ecosystem consists of several disparate software systems running in heterogeneous environments. Hence, we conceived Heron as a collection of extensible and self-contained modules running on top of a kernel that provide the fundamental blocks needed to build a streaming engine. Figure 2 below shows the different Heron modules and how they interact with each other. In contrast with the monolithic architectures of popular open source streaming systems, Heron consists of several modules that communicate through basic inter/intra-process communication mechanisms (IPC). Since streaming systems are fundamentally data movers, the IPC mechanism essentially serves as the “kernel” of the system. Every other Heron module is run as a process in the operating system and can be easily extended and pluggable into the system, similar to microkernel operating systems.
As mentioned in the Introduction to Heron blog post, a Heron topology is a directed acyclic graph (DAG) of spouts and bolts. Spouts are the sources of data while bolts perform computation on the streams they receive from spouts or other bolts. As described in detail in that blog post, when a topology is submitted, a number of containers are spawned, with the first container running the scheduler and the topology master. The topology master manages the topology during its lifecycle. The remaining containers run a stream manager, a metrics manager, and a set of Heron instances. Let us look at these modules a bit in detail.
This module calculates how many resources (CPU, memory, disk) are allocated for a particular topology based on how the topology instances are packed into executable units called containers. Heron provides several packing algorithms out of the box. If you need to change how several instances are packed into containers, you have the flexibility to add a packing algorithm of your own.
This module is responsible for interacting with scheduling frameworks, such as Aurora, Mesos, and Kubernetes, and for allocating the necessary resources based on input from the resource manager. If you want to make Heron run on a brand new scheduler, you need to add an implementation for it while the rest of the components don’t need to be changed.
This module provides distributed coordination and stores topology metadata such as the physical plan or logical plan. There are existing state manager implementations for Zookeeper, for clustered environments, as well as for the local file system for running on a single server or laptop. The state manager can be easily mapped into other distributed coordination layers such as etcd (which is used by Kubernetes).
This module manages the topology during its lifecycle. It’s responsible for assembling the physical plan for the topology during startup or during failure of a container and broadcasting the physical plan to the remaining containers.
This module routes tuples between instances present in the same container or across containers. If you want to move Heron to run on top of new interconnects such as Infiniband, the underlying network module can be easily mapped to use the new API.
This module runs the user logic code or the operator code of the topology DAG. It receives data from the stream manager, processes it, and sends the data to the stream manager for subsequent processing, if needed.
The modular architecture of Heron’s architecture provides flexibility to both application developers and core Heron developers. This enables Heron to work with:
Heron can easily support Heron topologies written in different languages. Heron developers can easily support a language by supplying an instance in the appropriate language so that end-to-end execution of the topology happens in the same language. Since it is modularized, the time it takes to add a support a new language is just a few weeks.
Along the same lines, an application developer can use the YARN implementation of the scheduler, if she operates on top of a YARN cluster. If her environment changes, she can easily, with just a few simple config changes, switch to another scheduler implementation (e.g. Kubernetes) without having to change her topology or her Heron setup. Heron developers can add support for a new scheduler in a matter of few weeks.
Similarly, the application developer can pick different packing algorithms that better suit her needs or even implement new policies. The user can configure the Heron modules either at topology submission time through the command line or using special configuration files. Heron developers can add specific algorithms that optimize certain resources or achieve locality depending on the shape and size of the topology DAG.
With its modular architecture, Heron runs in several different environments in production:
In later blog posts, we will elaborate more on integration. If you’re interested in Heron, you can participate in the community via: