Streaming MapReduce with Summingbird

Tuesday, 3 September 2013

Today we are open sourcing Summingbird on GitHub under the ALv2.

Summingbird is a library that lets you write streaming MapReduce programs that look like native Scala or Java collection transformations and execute them on a number of well-known distributed MapReduce platforms like Storm and Scalding.

For example, a word-counting aggregation in pure Scala might look like this:

def wordCount(source: Iterable[String], store: MutableMap[String, Long]) =
source.flatMap { sentence =>
toWords(sentence).map(_ -> 1L)
}.foreach { case (k, v) => store.update(k, store.get(k) + v) }

However, counting words in Summingbird looks like this:

def wordCount[P <: Platform[P]]
(source: Producer[P, String], store: P#Store[String, Long]) =
source.flatMap { sentence =>
toWords(sentence).map(_ -> 1L)
}.sumByKey(store)

The logic is exactly the same and the code is almost the same. The main difference is that you can execute the Summingbird program in:

  • batch mode (using Scalding on Hadoop)
  • real-time mode (using Storm)
  • hybrid batch/real-time mode (offers attractive fault-tolerance properties)

Building key-value stores for real-time serving is a special focus. Summingbird provides you with the foundation you need to build rock solid production systems.

History and Motivation

Before Summingbird at Twitter, users that wanted to write production streaming aggregations would typically write their logic using a Hadoop DSL like Pig or Scalding. These tools offered nice distributed system abstractions: Pig resembled familiar SQL, while Scalding, like Summingbird, mimics the Scala collections API. By running these jobs on some regular schedule (typically hourly or daily), users could build time series dashboards with very reliable error bounds at the unfortunate cost of high latency.

While using Hadoop for these types of loads is effective, Twitter is about real-time and we needed a general system to deliver data in seconds, not hours. Twitter’s release of Storm made it easy to process data with very low latencies by sacrificing Hadoop’s fault tolerant guarantees. However, we soon realized that running a fully real-time system on Storm was quite difficult for two main reasons:

  • Recomputation over months of historical logs must be coordinated with Hadoop or streamed through Storm with a custom log loading mechanism
  • Storm is focused on message passing and random-write databases are harder to maintain

The types of aggregations one can perform in Storm are very similar to what’s possible in Hadoop, but the system issues are very different. Summingbird began as an investigation into a hybrid system that could run a streaming aggregation in both Hadoop and Storm, as well as merge automatically without special consideration of the job author. The hybrid model allows most data to be processed by Hadoop and served out of a read-only store. Only data that Hadoop hasn’t yet been able to process (data that falls within the latency window) would be served out of a datastore populated in real-time by Storm. But the error of the real-time layer is bounded, as Hadoop will eventually get around to processing the same data and will smooth out any error introduced. This hybrid model is appealing because you get well understood, transactional behavior from Hadoop, and up to the second additions from Storm. Despite the appeal, the hybrid approach has the following practical problems:

  • Two sets of aggregation logic have to be kept in sync in two different systems
  • Keys and values must be serialized consistently between each system and the client
  • The client is responsible for reading from both datastores, performing a final aggregation and serving the combined results

Summingbird was developed to provide a general solution to these problems.

The Way of the Summingbird

Since Summingbird was designed to provide a streaming MapReduce model that can always be run in real-time, batch, or hybrid-merged modes, certain design choices were made. All input or output data fits into one of a few categories: Source, Service, Store, or Sink (see the core concepts on the wiki). Like Hadoop, all state is kept in the data, and not with the workers. Events enter the system via a Source. When we are computing a new value to serve, we are always merging into a Store, which holds the value for each key. That merging operation is always associative, and this associativity is exploited for both parallelism as well as enabling the merging of hybrid online/offline jobs. In algebraic terms, all our merge operations are Monoids or Semigroups, and we have a developed a considerable collection of them for reuse. To do a join or lookup, we use a Service. A Service is a kind of real-time Key-Value readable store, e.g. a read from a database. In the offline mode, a Service is implemented as one of several types of joins. When we want to export a data stream, we write to a Sink. In real-time, this might be pushing onto a queue, versus on Hadoop where this is just materializing a new directory that covers some date range. With these four primitives we can easily compose Summingbird jobs: a store of one job becomes a service of another; sinks from one job become sources for another. This composability is very powerful and allows modular scaling of your computations. Once a new derived data source has proven it’s value, it may become an input into the next computation.

Each Platform, such as Storm or Scalding, defines its own notion of these four data concepts and jobs can be written in a way that is completely agnostic to how a particular platform handles the data input and output.

Summingbird Hatchlings

It took a village and a lot of collaboration to develop Summingbird, and many more projects were spawned because of its existence, notably:

  • Algebird: Algebird is an abstract algebra library for Scala. Many of the data structures included in Algebird have Monoid implementations, making them ideal to use as values in Summingbird aggregations.
  • Bijection: Summingbird uses the Bijection project’s Injection typeclass to share serialization between different execution platforms and clients.
  • Chill: Summingbird’s Storm and Scalding platforms both use the Kryo library for serialization. Chill augments Kryo with a number of helpful configuration options, and provides modules for use with Storm, Scala, Hadoop. Chill is also used by the Berkeley Amp Lab’s Spark project.
  • Tormenta: Tormenta provides a type-safe layer over Storm’s Scheme and Spout interfaces.
  • Storehaus: Summingbird’s client is implemented using Storehaus’s async key-value store traits. The Storm platform makes use of Storehaus’s MergeableStore trait to perform real-time aggregations into a number of commonly used backing stores, including Memcached and Redis.

We’re very excited about growing a community around Summingbird as we move beyond our initial release.

Future Work and Getting Involved

If you’re interested in getting involved, some of our future plans include:

  • Support for more execution platforms, e.g. Spark and Akka
  • Pluggable optimizations for the Producer graph layer
    • Projection and filter pushdown
  • Support for filter-aware data sources like Parquet
  • Libraries of higher-level mathematics and machine learning code on top of Summingbird’s Producer primitives
  • More extensions to Summingbird’s related projects (listed below)
    • More data structures with Monoid instances via Algebird
    • More key-value stores implementations via Storehaus
    • More Storm data sources, via Tormenta
  • More tutorials and examples with public data sources

To learn more and find links to tutorials and information around the web, check out the wiki. The latest ScalaDocs are hosted on the project page and discussion occurs primarily on the Summingbird mailing list ([email protected]). Feature requests or bugs should be reported on the GitHub issue tracker. If you’re looking to get involved with the project, introduce yourself on the mailing list and check out issues tagged as newbie for ideas on first contributions.

We also recommend you follow @summingbird to stay in touch; we’ll be listening.

Acknowledgements

Summingbird was originally authored by Oscar Boykin (@posco), Sam Ritchie (@sritchie) and Ashutosh Singhal (@daashu). We would also like to thank Doug Tangren (@softprops), Ryan LeCompte (@ryanlecompte), Aaron Siegel (@asiegel), Bill Darrow (@billdarrow), Brian Wallerstein (@bwallerstein), Wen-Hao Lue (@wlue), Alex Roetter (@aroetter), Zameer Manji (@zmanji) and Dmitriy Ryaboy (@squarecog) for their valuable feedback. Finally, we’d like to thank the 40+ community of contributors to Algebird (18 contributors), Bijection (11 contributors), Chill (6 contributors), Tormenta (3 contributors) and Storehaus (8 contributors).