Adopting RocksDB within Manhattan

Thursday, 22 April 2021


Manhattan is Twitter’s internally developed distributed key-value store. It’s the default storage at Twitter for persistent real-time serving needs and is currently used to serve all core nouns at Twitter including Tweets, Users, and Direct Messages (DMs). Manhattan was designed from its inception to have a pluggable storage engine. The storage engine is the lowest layer of Manhattan's stack and it is responsible for storing and retrieving data within an individual node.

As outlined in an earlier blog post, Manhattan had two read-write storage engines: 

  • SSTable: An in-house LSM-based write-optimized storage engine
  • MhBtree: Our B-tree based read-optimized storage engine

Most of Manhattan's workloads in production are read-heavy. As a result, the majority of our clusters were running on MhBtree. While MhBtree gave good performance for read-heavy workloads, it had a few shortcomings: 

  1. Steep learning curve for developers: MhBtree is implemented as a MySQL plugin, which in turn uses the MySQL handler interface to communicate with the underlying InnoDB storage engine. The plugin code alone consisted of 15k lines of C++ code. Making a change to this component required an engineer to not only navigate this large amount of custom plugin code, but also have a deep understanding of the internals of MySQL and InnoDB. This created a large barrier for development and made the component very inaccessible to most of the team. This inaccessibility became an even bigger strategic risk once the primary authors of the plugin were no longer working in the team. 
  2. Poor compression: Compared to the SSTable storage engine (LSM-based), we observed that MhBtree took ~50% more space for the same amount of application data. This directly translated to increased costs for the clusters that were constrained by storage.
  3. Inefficient backups: To ensure availability, backups in MhBtree were done live (hot) and implemented via a copy-on-write (CoW) mechanism.

    • Within the CoW scheme, MhBtree would make a copy of each page that was modified while the backup was running. But when an insert/update workload is uniformly distributed (as is true for many Manhattan workloads), the CoW scheme degenerates to copying almost all of the pages within the database. Our CoW scheme was introducing additional load on the node, and in one extreme case, we had to over-provision the cluster by up to 50% just to handle the additional load incurred during backups.
    • In order to limit storage usage for backups, we also implemented incremental backups: we only copied pages that had changed since the last backup. But, as with the situation above, if the update workload is uniformly distributed, then this also degrades to nearly a full backup.

Due to the above shortcomings, our team explored alternative storage engines and discovered RocksDB, which is an LSM-based storage engine that was gaining increased adoption. In particular, the tunable amplification looked very promising and presented an opportunity to unify our two read-write storage engines into a single one that could be tuned differently to different workloads.

Evaluating RocksDB

In 2017, we kickstarted an evaluation of RocksDB as a new storage engine for Manhattan. Since RocksDB is also an LSM-based storage engine, we were reasonably confident that its performance could match that of SSTable for write workload. Our biggest uncertainty was whether RocksDB could match MhBtree's performance for read-heavy workloads. We wanted to evaluate RocksBD’s performance on these workloads before investing a lot of time into integrating RocksDB into Manhattan and making it production-ready.

So we created a prototype RocksDB storage engine with reduced functionality and relaxed correctness requirements. We set up an experiment cluster with our prototype RocksDB backend as the backing store and ran a variety of synthetic workloads, predominantly the read-heavy workload of our largest cluster. After several rounds of tuning (RocksDB has a *lot* of options to tune), the results looked promising: 

  • ~50% reduction in data usage (i.e., better compression) over our BTree backend
  • ~20% reduction in CPU usage
  • Equal or better latencies

These results gave us the confidence to move forward with RocksDB and invest in production-grade integration. 

Integration with RocksDB

Data model


The data model for Manhattan is described in this blog post from 2018:

"Manhattan provides a data model where each record contains a partition key (PKey) and a local key (LKey) mapping to a value. A partition key, as the name suggests, is used to partition a record onto different nodes. A partition key can have multiple local keys that are sorted."

Additionally, because Manhattan is a multi-tenant system – that is, a single cluster is shared by multiple applications/datasets – a third key is needed to disambiguate between different datasets within the same cluster. So together, a Manhattan key consists of 3 components: dataset ID, PKey, LKey. 


In addition to the user-provided value payload, Manhattan also maintains its own internal fields such as the timestamp of the last write/modification, an expiration timestamp (also known as “time to live”, or TTL), and a boolean flag that indicates whether a value is soft-deleted (called a “tombstone” flag). Manhattan by design does not rely on the storage engine to provide these values and instead maintains them itself at a higher abstraction level. This design choice is what enables Manhattan to have a pluggable storage engine.


RocksDB only supports raw bytes as keys and values. We encode the multi-component keys and values described above using a simple length-prefixed encoding: For each component, we first write out its size/length in bytes, followed by the serialized bytes of the component. 

Key ordering

RocksDB's default key-ordering is based on a byte-wise comparison of keys. Since we have a custom encoding scheme for keys, the default byte-wise comparison of keys would not be logically correct. For example, consider two PKeys, abc and d, for the same dataset. In their physical encodings, the dataset ID would be the same for both. After that, the length-encoding bytes for abc would have the value 3, while the same bytes for d would have the value 1. A byte-wise comparison of length bytes would result in abc being greater than d since 3 is greater than 1, even though a logical comparison would normally have d be “greater than” abc

To correctly implement the desired logical behaviour, we make use of RocksDB’s key comparator hook and override it with an implementation that decodes the logical components of a key and performs a component-wise logical comparison. 


As described in the Value section above, Manhattan maintains its own application-level timestamp. This application-level timestamp is used to provide “last write wins” semantics to the user – In other words, a value with a higher Manhattan timestamp is “greater than” a value with a lower Manhattan timestamp, irrespective of which one was written last in RocksDB.

To achieve these semantics, we needed to define and use the Merge operator in RocksDB.

Merge operation

RocksDB doesn’t let you override the basic put/delete operations; instead, it provides an operation called Merge that allows applications to define custom read-modify-write semantics. The desired read-modify-write semantic for Manhattan can be defined in pseudocode as:

Replace existingValue if (existingValue.timestamp < newValue.timestamp)

In order to apply the above semantics for all writes, Manhattan exclusively uses the Merge operation for any write or mutate operations, and never uses the put/delete operation.

Merge operator

In order to make the Merge operation work, we implement our own custom Merge operator that implements the desired semantics. Given a list of candidate values (merge operands) for a key, it picks and returns the “latest” value, with "latest" being defined as the value with the largest Manhattan timestamp.

Prefix extractor

Most read queries to Manhattan are of the form: 

    Get everything under a (dataset ID, PKey)
    Get a range/slice within a (dataset ID, PKey) from [LKey_start, LKey_end]

As you might have observed, both the query types share the same prefix (dataset, pkey). RocksDB allows you to configure a Prefix Extractor, and doing so enables RocksDB to use more efficient data structures that improve performance for such short-read use cases.

We built a prefix extractor that decodes the key and returns the bytes up until the end of the PKey as the “prefix”. In other words, the prefix consists of the dataset ID length, dataset ID bytes, PKey length, and PKey bytes.

Compaction filter

As described in the Value section above, Manhattan maintains its own tombstone flag to indicate if a value is deleted at the application level, and a TTL field. As a result, it is Manhattan’s responsibility to implement its own “garbage collection” mechanism. RocksDB conveniently provides a compaction filter hook that allows applications to perform their own garbage collection. 

Manhattan makes use of the hook and implements its own compaction filter that inspects values for tombstone and TTL fields, using this data to decide whether to keep or remove a value. 


Barring the compaction filter, all of the components described above were implemented in C++ for performance reasons. The compaction filter, however, was implemented in Java for two reasons:

  1. Our previous LSM backend (SSTable) was written in Java and had a fully implemented compaction filter. We wanted to reuse the compaction filter code rather than write it again in C++.
  2. The compaction filter logic has a dependency on viewing dataset metadata stored in Zookeeper; since Manhattan is a Java application, we already have Java implementations of these metadata objects. If we were to implement the filter in C++, we would have to create and maintain mirror classes of all of these Zookeeper dataset metadata objects. We wanted to avoid taking on this and other similar C++/Java plumbing work, which is error-prone and comes with maintenance overhead. 

Java Native Interface(JNI) Attachment Mechanics

Unfortunately, our first Java implementation caused a slowdown in compaction performance. With significant help from the amazing JVM team at Twitter, we tracked this slowdown to the thread “attach” and “detach” steps that occur during JNI handoff:

For every key-value pair in the compaction input, the C++ compaction thread makes a JNI call that first “attaches” to the JVM. It then calls FilterV2, and finally “detaches” from the JVM. This attachment and detachment process is controlled via a mutex lock, which is the source of the slowdown.

To mitigate this, we removed the “detach” phase completely. This works because:

  1. Re-attachment is quick and idempotent.
  2. The pool of compaction threads is fixed, so we pay the attachment cost only once.

(Note, however, that the latter point is not true if you enable subcompactions, since subcompactions are executed on ad-hoc thread pools instead of from a fixed background thread pool.)

More details on this experimental approach can be found in this RocksDB pull request.


It took about a year for us to implement everything described above and finish productionizing the new RocksDB storage engine; this work was complete by mid-2018. Over the next two years, we migrated several Manhattan clusters to RocksDB, including clusters that hosted several of the core nouns at Twitter: Tweets, User Profiles, and Direct Messages (DMs). All of this data amounts to trillions of keys and Petabytes of storage.

Through this migration, we:

  • Provided the same or better latency as our previous storage engines, with lower CPU utilization and approximately 50% lower disk space
  • The improved efficiency enables us to shrink certain clusters, and on others, we were able to handle traffic growth without new hardware 
  • Deleted a third of our storage engine codebase and nearly 10% of the Manhattan codebase.

While the migration ended in a happy place, the journey has not been entirely smooth. Future blog posts will describe 

  1. The methodology we used to gain confidence that we won’t lose/corrupt data during the migration
  2. Several of the performance/stability problems that we encountered before and during the production rollout


It took a village to get this large migration project across the finish line. 

Eng: Ben Clay, Vijay Teja Gottipati, Sowmya Pary, Jigar Bhati, Sahib Pandori, Donghua Liu, Kevin Yang, Justin Hendryx, Sean Ellis, Dhaval Patel 

EMs: Rashmi Ramesh, Unmesh Jagtap, Karthik Katooru

Special thanks to Andrew Kryczka, who provided us with deep RocksDB knowledge and helped solve some of our technical challenges during the migration.


This post is unavailable
This post is unavailable.