As part of Factual’sGeopulse product suite, we need to be able to absorb and process large amounts of data, and deliver back a somewhat smaller amount of data. There is a significant amount of technology available for the processing stage, but fewer for both the intake and delivery. Today, we’re open sourcing two libraries that we’ve used for for these purposes, s3-journal and riffle . Both of these libraries are notable for making efficient use of persistent storage by avoiding random writes, which will be discussed in more detail later in this post.
On the intake side, we need to persist the data we’re being sent somewhere Hadoop can get at it. At the volume we were receiving it (billions of daily entries, comprising terabytes of compressed text), S3 seems to be the most straightforward place to put it. While we don’t make a habit of losing data, guaranteed delivery to S3 upon receiving a particular entry was not a requirement. This meant that we could implement our intake service as a horizontally scalable set of servers, accepting data and uploading it in batches to S3.
At first, we used the Hadoop S3 client, which we had used in some lightweight use cases before. Unfortunately, it ended up coping poorly. Each server had a sustained throughput of about 10 megabytes of compressed data per second, and every so often S3 would hiccup and stop accepting data for a few minutes. When this happened, Hadoop’s client would simply buffer the data in-memory until all the memory was exhausted, at which point the process would become non-responsive. That server’s traffic would then spill over onto the other servers, and the process would be repeated.
One solution would be to give each process so much memory that they could survive any S3 outage of reasonable duration, but this is both fragile in the face of unreasonably long outages, and fragile with respect to process death: arbitrarily large in-memory buffers enable arbitrarily large data loss. Given this, a much better solution is to write the buffer to disk, since that gives us both much more space to work with and makes us robust to process death.
A downside to this approach is that storing and retrieving things in memory is much, much faster than accessing things on disk, even if the disk is solid state. The underlying library, durable-queue , uses append-only slab-allocated buffers to minimize I/O overhead, but this doesn’t change the fundamental capabilities of the hardware.
So in addition to minimizing overhead, s3-journal writes entries to disk in batches constrained by the number of entries, the maximum time allowed to elapse between the first to last entry, or both. These entries are written and then read out as a single binary blob, allowing us a high effective throughput measured in entries/sec without saturating the throughput of the disk controller. The downside to this, obviously, is the same as with the previous client: data buffered in memory may be lost forever. However, the issue with the other client was less that any data could be lost, but rather that the amount of data that could be lost was unbounded . By exposing parameters for how often s3-journal flushes to disk, people using the library can find their own golden mean between throughput and durability.
Since we started using it at the beginning of the year, s3-journal has successfully persisted over a trillion entries into S3, comprising over half a petabyte of textual data. Given that it’s built on top of an service which is under continuous development, and that new versions may be deployed without our knowledge at any time, there may be new and exciting failure modes introduced in the future. But so far it has proven a simple and reliable way to get our data safely persisted, and we expect it will be a useful tool for a wide variety of other applications.
After the data is persisted and processed, we need to get it back to our customers in a form they find useful. For some, we can simply expose it via our public API, but many of them are latency sensitive enough that the data needs to be on-premise. Of those customers, some already have data stores that can be used to serve up our data (which is just a large number of key/value pairs), but many did not. Given this, and given our desire to make the integration as straightforward as possible, we figured we’d just provide one for them.
There exist today a surprising number of in-process key/value stores, including LevelDB , LMDB , RocksDB , and many others. Each of these provides some variation on the “database as library” use case, where a process needs to persist and look up certain values by key. Our first version took LevelDB, which had decent Java bindings, added some background syncing logic, and wrapped an HTTP server around it.
At first, this worked quite well. Starting from an empty initial state, we were able to quickly populate the database, all the while maintaining an impressively low latency on reads. But then the size of the database exceeded the available memory.
LevelDB uses memory-mapping to keep its index files in memory regions that can be quickly accessed by the process. However, once the size of the memory-mapped regions exceed the available memory, some will be evicted from memory, and only refetched if the region is accessed again. This works well if only some of the regions are “hot” – they can stay in memory and the others can be lazily loaded on demand. Unfortunately, our data had a uniform access pattern, which meant that regions were being continuously evicted and reloaded.
Even worse, once the database grew past a threshold, write throughput plummeted. This is because of write amplification , which is a measure of how many bytes need to be written to disk for each byte written to the database. Since most databases need to keep their entries ordered, a write to the middle of an index will require at least half the index to be rewritten. Most databases will offset this cost by keeping a write-ahead log, or WAL, which persists the recent updates in order of update, and then periodically merges these changes into the main index. This amortizes the write amplification, but the overhead can still be significant. In our case, with a 100GB database comprising 100mm entries, it appeared to be as high as 50x.
So this meant that each time we had a batch of keys to update, it was not only slow (and getting slower), but it saturated the I/O throughput of the system, greatly degrading the read latency. This degraded state would persist until the updates were done, which could take as long as a day.
One solution we investigated was using RocksDB , which is a fork of LevelDB made in response to precisely the sort of production behavior we saw. But while RocksDB was better, it was still fundamentally a solution to a difficult problem we didn’t care about: random writes to disk. Our usage pattern was almost all reads, except for batch updates every week or so. Rather than write to the database, we could simply write a new database. And so we did.
Previous examples of this approach include DJ Bernstein’s cdb and Google’s sorted-string tables, which were first introduced in the BigTable paper , and have gone on to be used extensively within Google (including as the underlying storage format in LevelDB) and elsewhere. Both formats are similar in their intent: an index maps keys onto the location of the values on disk, meaning that values can generally be fetched in a single read from disk.
There are, however, some meaningful differences between the two. SSTables keep the entire set of keys in memory, so that the file offset for the value can be immediately determined. The cdb format, conversely, uses an in-file hashtable that maps hashed keys onto offsets. The possibility of hash collisions means that there may be more than one lookup within the file, but this means our memory usage is not determined by the size of our keys. Even if the hashtable is memory-mapped, as it usually is, the cost per key is fixed (typically ~10 bytes per key), rather than dependent on the size of the keys themselves. The original spec for cdb only allows for databases smaller than 4 gigabytes, but this can be extended to a terabyte by adding 8 bits onto the hashtable’s address space, and sharding can enable multi-terabyte indices.
Keys within SSTables are stored in lexicographic order, so that any two SSTable files can be merged in linear time and constant memory. Conversely, cdb files are arbitrarily ordered, to allow for new databases with a single updated value to be created in such a way that minimally disturb the ordering of the previous database.
Lastly, contemporary SSTables use block compression, where contiguous values are grouped and compressed. If there is shared structure between values, this can greatly increase the compression ratio relative to compressing individual values, but it means that each lookup requires reading and decompressing more than a single value. While cdb doesn’t use any sort of compression, a similar scheme could be grafted onto it.
Our implementation cherry-picked the elements we wanted from each of these sources: fixed memory overhead per key, linear time merging, and block compression. While memory-mapping is used for the hashtable, values are read directly from disk, decoupling our I/O throughput from how much memory is available. The keys are consistently ordered to allow for linear merges, but not lexicographically, so unlike SSTables range queries are not possible. The resulting library, Riffle , is far from novel, but is only ~600 lines of code and is something we we can understand inside and out, which allowed us to write a simple set of Hadoop jobs that, given an arbitrary set of updated keys and values would construct a set of sharded Riffle indices, which could then be downloaded by our on-premise database servers and efficiently merged into the current set of values. A server which is new or has fallen behind can simultaneously download many such updates, merging them all together in constant space and linear time.
It’s worth noting that this approach of building entire databases using Hadoop is not new, as demonstrated by ElephantDB and others. One reason for avoiding ElephantDB is that it uses LevelDB under the covers (albeit in a read-only capacity), and generally assumes that the distributed filesystem and database servers are on the same local network and operated by the same people. It’s an excellent piece of infrastructure used to good effect in a number of places, but not quite what we needed.
Riffle is newer than s3-journal, but it has handled significant traffic and volume without any issues. For reference, here are some basic random read benchmarks, taken on an AWS instance using the riffle benchmark command:
Unfortunately, we haven’t gone through the exercise of loading the same 100mm entries into LevelDB, LMDB, or others, so it’s hard to say exactly how “good” these results are. However, given the use of memory-mapping by many of these databases, as well as the O(log n) lookups in the underlying on-disk LSM trees , compared to the O(1) lookups for Riffle, it’s likely that any such comparison would be favorable on a dataset of similar size. For datasets where the memory-mapped regions actually fit in memory, however, the other databases would be faster. It’s worth noting, though, that in either case the use of Clojure to implement Riffle has no real impact on the performance, since it’s inescapably I/O bound.
Riffle inhabits an interesting local maximum in the design space. It’s certainly not a drop-in replacement for other key/value databases, but we think it will be useful in a broad range of applications, and look forward to seeing how it’s used elsewhere.