Android Messaging: Deploying the Octobot
Editor’s Note: This post was compiled and created by Scott Andreas, who can be reached on twitter: @cscotta.
Last month, we set a goal of improving the throughput of our Android messaging stack. This stack has three primary components, whose interactions are mediated by Beanstalk queues:
1) The frontend API to which app publishers push messages.
2) A worker responsible for preparing these messages for delivery to devices.
3) Our “Helium” cluster – the edge nodes to which Android devices maintain a persistent connection for message delivery.
Previously, we’d discussed the design and implementation of our “Helium” server, a Java NIO-based service designed to maintain active connections to up to 500,000 devices and deliver messages to them in real time. In this post, we move one level up in the stack to look at our message prep tier.
This tier is responsible for six primary tasks: Delivering individual messages, delivering broadcast messages for a particular app to every user, delivering messages to every device registered with a specific “tag” for an app, storing pending notifications for devices not online at the time of a push, registering and activating devices as they come online, and recording billing stats reported by our Helium cluster.
An Ambitious Goal
Our initial goal was to be able to deliver a sustained 10,000 messages per second with at most five servers. After setting this goal, we moved to analyze the performance of our (then-)current “Heworker” implemented in Python using Celery. The data access pattern for this task requires large, sequential reads for key/value data, which we read briskly from Cassandra, with supporting data fetched from a MongoDB collection. After data is received, the task prepares two protobufs, encrypts a message, wraps it in an RPC envelope, and passes it on to a Helium server.
We found that on an EC2 c1.medium High Compute instance, our present broadcast message delivery rate was roughly 312 messages per second, with delivery to tags at 280 messages per second. After realizing that the vast majority of MongoDB queries could be eliminated by grouping broadcast message delivery into buckets of 20,000 at a time and fetching the supporting data from Mongo with a single “$in” query, we were able to roughly double the throughput of this task to around 622 messages/second, at which point it became largely CPU-bound. Slightly disheartened, simple math suggested that our goal would require at least 16 servers, along with the work of parallelizing delivery of broadcast messages across them.
Enter the Octobot
A few months ago, I wrote, documented, and released a JVM-based task queue worker called “Octobot.” Octobot is a task queue worker for applications that must process message queues with extremely high throughput, minimal latency, and high availability – especially those sensitive to the overhead implicit in receiving and processing many lightweight tasks, such as message delivery systems. Octobot can listen on any number of queues backed by AMQP, Beanstalk, or Redis, with any number of workers for each task queue, with each operating at a configurable priority. Tasks can be written in nearly any JVM language, with sample code available for Java, Scala, Clojure, and JRuby. Configuration is handled by a short YML file, and it runs anywhere Java does. Finally, deployment’s a snap – two jars, an init script, the YML config, and optionally, a logging config and you’re done.
Based on the promising results of a demo, I worked to build out a quick spike of a couple tasks for us to test. Due to more pressing issues, the project was put on hold for some time, with work resuming a few weeks later on a tighter schedule toward a full implementation. We implemented the Cassandra and Mongo data models, ported each of the tasks from Python to Java, and implemented a full test suite using ScalaTest. The re-implementation and testing of this tier took one developer about three weeks.
A Pleasant Surprise
The results were a pleasant surprise. The new Octobot-based worker is capable of delivering about 16,500 tag messages per second, and roughly 13,300 broadcast messages per second per Octoworker instance (a meager c1.medium). Our original goal of 10,000 messages/second across five or fewer machines could now be handled by just one instance at 75% capacity.
Profiling the Embarrassingly Parallel
During development, profiling Octobot and the worker under VisualVM helped us uncover many opportunities for optimization and parallelization of message delivery. To deliver a broadcast message to all devices registered with an application, we need to know which devices are registered with an application (stored in Cassandra), and to which Helium server they’re connected to (stored in Mongo). The new worker minimizes database and IO latency by delivering messages in batches of 20,000 – that is, by fetching all data required for delivery of a batch of messages, then parallelizing the CPU-bound delivery tasks across a thread pool.
The breakdown of a broadcast to 10,000 devices looks about like this:
– 61ms round-trip to Cassandra to fetch 10,000 keys from an index.
– 257ms round-trip to Mongo to fetch 10,000 records from a collection.
– 432ms to construct the device messages in parallel and place them in an internal queue for delivery.
The final task here is CPU-bound. With all data required for message generation in memory, it is highly parallelizable. As such, we’ve applied a poor man’s fork/join pattern (think map/reduce, but inside the application itself) by dividing the workload across a number of worker threads. This strategy enables us to take advantage of all processors in the system. While EC2’s c1.medium instances expose two cores, the application will parallelize equally well across four, eight, twelve, or more cores. As elsewhere but especially here, we’re thankful to be programming in an environment that allows for concurrency as well as parallelism within a single process.
Parallel Message Delivery Implementation
Beyond its raw speed, the Octobot architecture brings significant enhancements to our infrastructure in terms of monitoring and introspection. With a quick Munin plugin, we’re provided with graphs of average execution time, successes, failures, and retries per task (click to enlarge the stats at right). Failures result in an error notification e-mail detailing the incoming task, the hostname of the instance that executed it, arguments supplied, and a stacktrace if one is available. This makes for quick debugging and updates.
All told, we’re very pleased by how the project turned out. If you’re in the process of implementing a distributed system that requires high-throughput background processing, a similar strategy might be worth considering.
If you’re interested in evaluating Octobot for your needs, check it out at http://octobot.taco.cat/ The 1.0 release is just about a week away, with additional samples on the way for more JVM languages, better documentation, instrumentation, and monitoring. It’s processed well over 5 million tasks for us in the past week and is well-behaved.
If the fork/join pattern is new to you and your work is parallelizable, you might enjoy Doug Lea’s “A Java Fork/Join Framework” (PDF).
And last but not least, if you’re interested in implementing Push Notifications and remote messaging for your Android application, grab an account! You get a million messages free per month, delivered instantly and securely, to your user’s devices.