Querying a Key/Value Database – Learning with Riak

On the continuum of databases, key/value databases are the simplest. Data is stored in an opaque value that’s identified by a key. Although key-value databases might seem overly simple, that simplicity hides flexibility. As key/value databases go, Riak provides a solid starting point with a large number of advanced features.

One key, one object... no locks

One key, one object… no locks

Key-Value lookups

Querying a key/value database is simple: you provide the database with a known key and you get back a value. That value can be anything – a PDF, XML, plain text, or the binary representation of an application object.

The beautiful part is that Riak doesn’t care what you’re storing; it just stores bytes.

Although Riak itself only stores a series of bytes on this disk, we have the ability to keep some knowledge of what we’re storing. Metadata is stored with each record; information like the content type, encoding, modification time stamp, and even user supplied metadata. Although Riak doesn’t worry about the value it stores, we can store enough metadata that a program can make the right decisions about the data.

Finding Data

Riak uses buckets to provide a logical namespace for identifiers in the database. You can think of buckets as roughly analogous to tables in an RDBMS. You wouldn’t store orders in the Employees table, likewise you wouldn’t want to put an order in the employees bucket. When we query Riak, we ask a single bucket to give us back a key. If you’re using CorrugatedIron (the .NET client), retrieving data looks something like this:

var response = _client.Get(ApplicationName, ObjectId);
var riakObject = response.Value;
var employee = riakObject.GetObject<Employee>();

One upside of Riak’s key/value approach is that data access speeds are predictable. Reading a 100KB object from disk will take a predictable amount of time: time to locate data (max disk seek) + time to read data (100KB) + time to stream data to client. Without a lock manager or any of the other intricacies of an RDBMS involved, predicting database response time is relatively easy. We don’t need to be as concerned about concurrent requests, either, because there are many active servers in a Riak cluster. The load will be spread among all of them.

Once the data has been retrieved, it’s up to the user to figure out what to do with it. Riak sends back metadata and a stream of bytes; Riak has no knowledge of what to do with those bytes, so it leaves that decision up to the caller. This is very different from what we’re used to with an RDBMS like SQL Server, but there’s an advantage to this approach: we can store data in flexible – as the application changes, we can easily change the way we store data. There’s no need for an all or nothing migration that can prevent features from rolling out.

Migrating Data

With SQL Server, if you want to change the way data is stored on disk, you probably need to re-write all of the data at the same time. This can cause tremendous problems for global businesses – if the fundamental data model behind something changes significantly, migrations can become very complicated.

Remember when I mentioned Riak just stores bytes on the disk? That means if we want to change the way an employee is structured, we can do it by saving the new version of the data. That’s it! From a database perspective, there’s no change; bytes are saved to the disk, bytes are read from the disk. We can slowly change each employee as they’re read, or we could make an upgrade that sweeps across the cluster.

Flexible data schema is a side benefit of a key/value database like Riak. Developers can change the way objects are stored in the application and immediately start saving data in the new format. Migrating data can happen when old versions of data is read:

  • The data’s schema is read from metadata, either in the Riak headers or from within the object.
  • If the schema is correct, keep moving!
  • If the schema is out of date, kick off a process to migrate the data to the new schema.
  • Save data with the new schema back to disk and keep moving!

Flexibility is a virtue of key/value databases – the database is as flexible as the developers are imaginative.

Riak that house!

Riak that house!

Heavy Lifting with Riak

At some point, the business will need functionality more complex than simple key/value lookups. Maybe somebody will want to query a range of data or see an aggregate view of user activity or sales data. With SQL Server, we can easily write a reporting query with multiple joins, aggregations, and group by statements. With Riak, there’s no such feature.

There are two ways to do large reads with Riak – reading multiple keys or writing a MapReduce query.

Before you think I’m crazy about reading multiple keys, hear me out. When you write a query in SQL Server, you’re searching for something based on known attributes of the data – customers who live in Oregon who have one dog. For most line of business applications, it’s even simpler – there is a fixed set of activities that users can perform and adding any new activities corresponds to adding a new feature. Searches are generally limited in scope and don’t give end users exploratory functionality – it’s a rare line of business application that lets users write their own SQL.

Users request data in ways that are predictable; they want to see new messages, get the price of their auctions, or just see what the top news of the day is. We know what they’re going to ask, the behavior of the application is well known. As a result, it’s easy to look at a user’s activity data and make a decision about the last data they saw and then retrieve all newer data. The hardest part of this approach is storing the data in a way that makes sense for the application’s queries.

To answer a question like “What’s new since I was here last?” we can create many buckets and time box them. Although buckets are roughly analogous to SQL Server tables, it’s a very rough analogue; buckets are a logical namespace within Riak. In this example, we could create a bucket per minute for messages and then name our buckets appropriately: messages-2013-03-01T13:00:00Z, messages-2013-03-01T13:01:00Z, messages-2013-03-01T13:02:00Z. Requesting the new messages since a user’s last login becomes as simple as looking at the user’s last login, the current time, and determining which ranges of keys contain the right messages.

Submitting many simultaneous requests to Riak will almost certainly be faster than performing a single MapReduce query, and we’ll get to that in a second. More importantly, we can submit many simultaneous requests to Riak and start displaying information as soon as requests start returning data; there’s no need to wait for the all data to be collected and streamed back to our client.

A great deal of application functionality can be written to take advantage of these patterns. Rather than use data normalization techniques to reduce storage and atomicity, we can write data in ways that fit with our application needs.

What About MapReduce?

MapReduce is on everyone’s lips in the world of NoSQL, distributed databases, and technology punditry. But not all MapReduce is created equal. MapReduce itself refers to a way of thinking about data problems – a function is used to map inputs and then a reducer is applied that performs some kind of aggregation. Riak’s MapReduce operates in this way, too.

At first glance, it would seem like Riak’s MapReduce functionality would be a great way to handle ad hoc requests for data. Unfortunately, it’s not a good option.

Riak’s MapReduce (MR for short) works like this – a request for an MR job is sent to one server (the coordinator). At this point, a large amount of magic happens, but the first map phase is sent to every node in the cluster. The nodes perform the map phase (in part the read every key in the bucket), and then stream the results back to the coordinator. Astute readers might notice some problems with this approach:

  • All data is streamed back to the coordinator (which may run out of memory).
  • Reading all data in the cluster (or even from a large bucket) may cause performance problems as every node in the cluster becomes busy.

MR has some great functionality in it, but you don’t want to use it for ad hoc querying. What you can do is use MR to retrieve multiple objects in one query (just like issuing multiple simultaneous reads), or you can use MR to transform a large amount of data. One of the best uses of MR is to perform a global update on your data – all data in a bucket will be read, manipulated, and then written back to disk.

Let’s say that the business wants to add two more reports to the application. The reports are complex enough that running them on the fly is expensive. You’ve figured out a way that you could do this by pre-aggregating data on write – it’s a lot more work up front but you can do it asynchronously and it means that getting results back to the user is as simple as a single lookup. There’s no work to do computing and aggregating results when they’re read – the work was done when the data hit disk.

There are other uses for MR, but it’s important for new users to realize that Riak’s MapReduce isn’t going to supply the ad hoc querying capabilities that they’re looking for. MR can be combined with other techniques and technology to supply rich query functionality, but plain MR won’t do it.

N.B. As Gerry points out in the comments, it’s possible to perform a Riak Search or secondary index query to narrow down the initial input to a MapReduce job.

Wrapping Up

Key/value databases like Riak provide a simple way to work with large amounts of data. Riak is an excellent starting point for teams looking to augment SQL Server – it provides an easy to use API and can be used to make many workloads easier and faster. Advanced use of Riak may require additional effort from developers and the business alike – it’s necessary to know the queries that will be run and adjust the shape of the data to fit the queries (potentially by storing multiple copies of data), but for many teams this additional work yields great payoffs by providing predictable performance, latency, and scalability.