Handling five billion sessions a day – in real time

Tuesday, 17 February 2015

Since we first released Answers seven months ago, we’ve been thrilled by tremendous adoption from the mobile community. We now see about five billion sessions per day, and growing. Hundreds of millions of devices send millions of events every second to the Answers endpoint. During the time that it took you to read to here, the Answers back-end will have received and processed about 10,000,000 analytics events.

The challenge for us is to use this information to provide app developers with reliable, real-time and actionable insights into their mobile apps.

At a high level, we guide our architectural decisions on the principles of decoupled components, asynchronous communication and graceful service degradation in response to catastrophic failures. We make use of the Lambda Architecture to combine data integrity with real-time data updates.

In practice, we need to design a system that receives events, archives them, performs offline and real-time computations, and merges the results of those computations into coherent information. All of this needs to happen at the scale of millions events per second.

Let’s start with our first challenge: receiving and handling these events.

Event reception
When designing our device-server communication, our goals were: reducing impact on battery and network usage; ensuring data reliability; and getting the data over as close to real time as possible. To reduce impact on the device, we send analytics events in batches and compress them before sending. To ensure that valuable data always gets to our servers, devices retry failed data transfers after a randomized back-off and up to a disk size limit on the device. To get events over to the servers as soon as possible, there are several triggers that cause the device to attempt a transfer: a time trigger that fires every few minutes when the app is foregrounded, a number of events trigger and an app going into background trigger.

This communication protocol results in devices sending us hundreds of thousands of compressed payloads every second. Each of these payloads may contain tens of events. To handle this load reliably and in a way that permits for easy linear scaling, we wanted to make the service that accepts the events be dead simple.

Handling five billion sessions a day – in real time

This service is written in GOLANG, fronted by Amazon Elastic Load Balancer (ELB), and simply enqueues every payload that it receives into a durable Kafka queue.

Because Kafka writes the messages it receives to disk and supports keeping multiple copies of each message, it is a durable store. Thus, once the information is in it we know that we can tolerate downstream delays or failures by processing, or reprocessing, the messages later. However, Kafka is not the permanent source of truth for our historic data — at the incoming rate of information that we see, we’d need hundreds of boxes to store all of the data just for a few days. So we configure our Kafka cluster to retain information for a few hours (enough time for us to respond to any unexpected, major failures) and get the data to our permanent store, Amazon Simple Storage Service (Amazon S3), as soon as possible.

We extensively utilize Storm for real-time data processing, and the first relevant topology is one that reads the information from Kafka and writes it to Amazon S3.

Handling five billion sessions a day – in real time

Batch computation
Once the data is in Amazon S3, we’ve set ourselves up for being able to compute anything that our data will allow us to via Amazon Elastic MapReduce (Amazon EMR). This includes batch jobs for all of the data that our customers see in their dashboards, as well as experimental jobs as we work on new features.

Handling five billion sessions a day – in real time

We write our MapReduce in Cascading and run them via Amazon EMR. Amazon EMR reads the data that we’ve archived in Amazon S3 as input and writes the results back out to Amazon S3 once processing is complete. We detect the jobs’ completion via a scheduler topology running in Storm and pump the output from Amazon S3 into a Cassandra cluster in order to make it available for sub-second API querying.

Speed computation
What we have described so far is a durable and fault-tolerant framework for performing analytic computations. There is one glaring problem however — it’s not real time. Some computations run hourly, while others require a full day’s of data as input. The computation times range from minutes to hours, as does the time it takes to get the output from Amazon S3 to a serving layer. Thus, at best, our data would always be a few hours behind, and wouldn’t meet our goals of being real time and actionable.

To address this, in parallel to archiving the data as it comes in, we perform stream computations on it.

Handling five billion sessions a day – in real time

An independent Storm topology consumes the same Kafka topic as our archival topology and performs the same computations that our MapReduce jobs do, but in real time. The outputs of these computations are written to a different independent Cassandra cluster for real-time querying.

To compensate for the fact that we have less time, and potentially fewer resources, in the speed layer than the batch, we use probabilistic algorithms like Bloom Filters and HyperLogLog (as well as a few home grown ones). These algorithms enable us to make order-of-magnitude gains in space and time complexity over their brute force alternatives, at the price of a negligible loss of accuracy.

Fitting it together
So now that we have two independently produced sets of data (batch and speed), how do we combine them to present a single coherent answer?

Handling five billion sessions a day – in real time

We combine them with logic in our API that utilizes each data set under specific conditions.

Because batch computations are repeatable and more fault-tolerant than speed, our API’s always favor batch produced data. So, for example, if our API receives a request for data for a thirty-day, time-series DAU graph, it will first request the full range from the batch-serving Cassandra cluster. If this is a historic query, all of the data will be satisfied there. However, in the more likely case that the query includes the current day, the query will be satisfied mostly by batch produced data, and just the most recent day or two will be satisfied by speed data.

Handling of failure scenarios
Let’s go over a few different failure scenarios and see how this architecture allows us to gracefully degrade instead of go down or lose data when faced with them.

We already discussed the on-device retry-after-back-off strategy. The retry ensures that data eventually gets to our servers in the presence of client-side network unavailability, or brief server outages on the back-end. A randomized back-off ensures that devices don’t overwhelm (DDos) our servers after a brief network outage in a single region or a brief period of unavailability of our back-end servers.

What happens if our speed (real-time) processing layer goes down? Our on-call engineers will get paged and address the problem. Since the input to the speed processing layer is a durable Kafka cluster, no data will have been lost and once the speed layer is back and functioning, it will catch up on the data that it should have processed during its downtime.

Since the speed layer is completely decoupled from the batch layer, batch layer processing will go-on unimpacted. Thus the only impact is delay in real-time updates to data points for the duration of the speed layer outage.

What happens if there are issues or severe delays in the batch layer? Our APIs will seamlessly query for more data from the speed layer. A time-series query that may have previously received one day of data from the speed layer will now query it for two or three days of data. Since the speed layer is completely decoupled from the batch layer, speed layer processing will go-on unimpacted. At the same time, our on-call engineers will get paged and address the batch layer issues. Once the batch layer is back up, it will catch up on delayed data processing, and our APIs will once again seamlessly utilize the batch produced data that is now available.

Our back-end architecture consists of four major components: event reception, event archival, speed computation, and batch computation. Durable queues between each of these components ensure that an outage in one of the components does not spill over to others and that we can later recover from the outage. Query logic in our APIs allows us to seamlessly gracefully degrade and then recover when one of the computations layers is delayed or down and then comes back up.

Our goal for Answers is to create a dashboard that makes understanding your user base dead simple so you can spend your time building amazing experiences, not digging through data. Learn more about Answers here and get started today.

Big thanks to the Answers team for all their efforts in making this architecture a reality. Also to Nathan Marz for his Big Data book.

Andrew Jorgensen, Brian Swift, Brian HatfieldMichael Furtak, Mark Pirri, Cory Dolphin, Jamie Rothfeder, Jeff Seibert, Justin Starry, Kevin Robinson, Kristen Johnson, Marc Richards, Patrick McGee, Rich Paret, Wayne Chang.