AOL, Meet Riak

March 21, 2011

In this post I describe why I like Riak and how I made use of it at my day job with AOL.

Hi, I Work At AOL

Yes, you read the title right. It says A O L. You know, America Online. That place that still makes money off people that fail to realize they don't need Comcast and AOL to check email. People like my loving mother; still supporting me to this day. Hi mom!

I work for a part of the company called AOL Advertising/Advertising.com on a project called AdLearn. AdLearn is the brains behind AOL's network advertising business. It attempts to find the best ad for a given impression. An impression is the displaying of an ad on a piece of inventory. A piece of inventory might be a banner on a website, for example. AdLearn takes many factors into account including past performance, targeting, campaign type, etc. to schedule the right ad for any given impression. With the help of Riak I wrote a system that plays a small, but significant, role in AdLearn 1.

Why I Chose Riak

It's very easy for me to like Riak as there are so many great things it brings to the table. Many of Riak's high points can be attributed to it's lineage from Amazon's Dynamo 2. Things like consistent hashing, vector clocks, hinted handoff, merkle trees, and configurable CAP properties all add up to a whole greater than the sum of it's parts. I can read that paper and have a general, high-level understanding of how Riak works it's magic. I can also take that knowledge with me to any other Dynamo-based solution. Here's a highlight reel of some of my favorite things about Riak.

What I Did With Riak

I made use of riak-kv, riak-search, and riak-core to help distribute my application's data, state, and processing, respectively. These technologies provided me with a framework I could use instead of trying to build the functionality myself. This allowed me to put the entire application behind a dumb splitter and call it a day. A horizontal scale out is a simple matter of standing up a new node, joining it to the cluster, and adding an entry to the splitter. Read that last sentence again -- let it sink in.

riak-kv

I used riak-kv as a distributed, highly available cache for various types of data indexed by campaign id. In the advertising world, or at least at Ad.com, we refer to a run of advertisements as a campaign. The systems that make up AdLearn tend to think in terms of campaigns and therefore it was a logical choice to index on.

It is key (excuse the pun) to remember that a Dynamo inspired system is designed to index on one, primary key. If you're looking for ad hoc queries via SQL with secondary indexes with distribution added on then you are barking up the wrong tree. You can use riak-search to satisfy some of your secondary index needs but it's no replacement for RDBMS when that's what you need.

The application I built talks to riak-kv via protocol buffers. It inserts serialized Erlang terms indexed by the campaign id. These terms are proplists (associative list for those not hip to Erlang) which contain a key with a binary data load as value. During a retrieve my application makes a call to riak-kv for the specific campaign, deserializes the term, and then extracts the binary datum to return. Since the AdLearn system, as a whole, is resilient to stale data I use a R value of 1 which allows lower latency gets with the chance of stale data. To further optimize this operation I also added a local, in-memory, short-lived cache of this data to handle the case of subsequent retrievals for the same data.

The application I built uses riak-search as a distributed file index. Let me explain. This application gets CSV data posted to it from many different clients. From this data, it builds CSV files to be retrieved. In short, it takes some CSVs as input, does some processing on them, creates some new CSVs, and returns those new CSVs as output to requesting clients. Simple enough, right?

However, there is metadata that my application needs to store about this data. This metadata includes information about when it was first written to the system, the type of data it contains, a temporal property that describes at what point in time this data is applicable, and some other attributes about the data being stored. In order to build the output, queries need to be made against this metadata. For example, what are all the CSVs of a certain type stored after a certain time? riak-search makes this easy for me.

  1. Designate a bucket to write all indexed entries to.

  2. Create a schema for the fields you want to index.

  3. Install the pre-commit hook and install the schema for the designated bucket.

  4. Since my objects to the bucket are serialized Erlang proplists I set the extractor to riak_search_kv_erlang_binary_extractor:extract

  5. Query the index using riakc_pb_socket:search. I also wrote a bit of code on top of it to further query my results such as ordering by a certain field, inequality queries, limiting the results, etc.

Bam! Now querying for files based on their metadata is fast and distributed allowing it to be performed on any node in the cluster with sub-second latency.

riak-core

My use of riak-core is probably slightly controversial. I basically used it to do something like gen_leader although I'm sure gen_leader is way more sophisticated and robust than what I put together. In fact, I wouldn't even put the two in the same universe.

My application needs a master process to control the creation of these CSVs across the entire cluster. These files can take minutes to build and take up lots of CPU so only one build process should run at a time on any given machine. Furthermore, I need the master to distribute these processes across the cluster so that the work can be done in parallel across physical nodes. So while one CSV is being built on node A, a different one is being built on node B. When one CSV is finished the worker sends a message back to it's master and a new worker is put in its place if there is more work to be done. Finally, there is a master process running on each node in the cluster. Each process has a heartbeat of 10 seconds. When a beat occurs that master needs to determine:

  1. Who is deemed the "active" master?

  2. If I am, then run. Otherwise, is the active master alive and running?

  3. If so, do nothing. Otherwise, set myself as the active master and get to work.

This is where riak-core comes into play. At the heart of riak-core is something called the ring. The ring is a small amount of state about the cluster that is gossiped between the nodes. It's one of the things that allows riak to rebuild itself after it has fallen down. I used the ring as well as riak-core's service discovery APIs to determine the state of the master and take failover action if needed. If the nodes partition or go down then each fragmented cluster or individual node will take action and assume it's the new master.

Results

I've had a cluster of 4 handling a near production load for over a month in my development environment. A 5-node cluster has been deployed to a staged production environment which will become fully active in the next coming weeks. It won't been seen outside of AOL's firewall, and it's not [directly] supporting millions of users. No, but it is playing a non-trivial role in a system that delivers over 1.5 billion impressions a day. That's gotta be worth something.


  1. Find out more here and here.

  2. How much Dynamo concentrate does riak have? Check out slide 8 of this presentation by Rusty Klophaus.

  3. Check out all the backend goodness.