Flite Culture

Sharding Redis

A Brief History of Redis at Flite

Here at Flite we use Redis as the backend for realtime metrics and the ad debugger. We use counters to tabulate various events such as impressions, interactions, and content events; as well as storing the body of each event – about 30 counters per event. Until recently, this data lived on a single Redis server for about 4 hours before it expired or was replaced by newer data. Over the past year of using this setup we have experienced problems with our data set growing too large and pushing data out in as little as 2 hours. We solved these early growing pains by moving to a server with more memory and code optimizations such as gzipping.

However a single server cannot scale forever…

Scalability Fail

We knew the single server solution couldn’t last forever and had plans of doing something about it. However when a large traffic spike brought down the realtime system, we had to act fast to diagnose the nature of the problem and engineer a solution. We quickly discovered that our Redis box was using 100% of one of the CPUs (did I mention that Redis is single-threaded?), and causing the clients to be refused.

We were already using one of the fastest EC2 instance types (m2.2xlarge), so going to a larger instance would have bought only 10% more CPU at a huge increase in cost. The other options available to us were Redis-as-a-Service and several modes of sharding. Whatever the solution, it had to be compatible with our existing infrastructure, which relies on Java to write realtime data, NodeJS to read, and some Python monitoring code.

We considered a couple alternatives:

First was twitter’s twemproxy aka nutcracker which sits in front of Redis and shards the data based on a configured hash function. This has been mentioned in redis documentation and works pretty well with all the commands which are supported. But apparently twemproxy doesn’t support some of the basic commands like ping, info, multi, etc. Especially the ping command is used by the jedis client before handing over client connections from the jedispool.

Another alternative we tried was to use a Redis-as-a-Service provider who claims to autoscale, autoshard, high performance, etc. It turned out this service is pretty expensive when compared to running our own redis instances on EC2. Moreover when we tried to validate that service, it ended up being not capable of handling the volume of data we are dealing with.

Built-in Redis Sharding

The Jedis client we use in Java has sharding support built-in. It seemed pretty straightforward to set up, however it has one catch that made it impractical for our use: it uses a fairly complicated consistent hashing algorithm to determine the shard. Since we needed to have both Node and Python reading the data, we decided that reimplementing a complex sharding algorithm in two other languages was more than what we wanted to do. Also, consistent hashing is not necessary for our application, where the data goes out of date within a few hours and we expected to have to change the sharding setup infrequently.

Sharding – how hard can it be?

After considering the above alternatives, we decided to try writing the sharding ourselves. After all, how hard can it be?

The first stab

Our first approach was to take the java hashCode of each key, mod it by the number of shards, and get the number of the shard we should use. So every key lives on one and only one node. This has the advantage of being very simple to retrofit to the Node and Python read code – the only extra work required is for them to determine which node to connect to and then simply read the value as before.

The write side was a little more complicated. Since each metrics request breaks down into 30 counters, and therefore 30 keys, and each key determines which shard it belongs on independently – every incoming metrics event will end up writing to every shard. We wrote code to manage N JedisPools, open N Jedis connections (in parallel), N Pipelines, and direct each write to the appropriate Pipeline based on the key. We decided to use N=4 in production, but our code was written to be flexible.

So now we were writing 1/4 the amount of data to each Redis instance per request. However each Redis instance was still receiving the same number of connections as before, and therein lies the rub. At around 500 connections per second, Redis seems to be connection-limited, rather than limited by the number of bytes or operations. This also meant that our scalability was broken – adding more Redis boxes would not reduce the number of connections to each instance. In fact, adding Redis instances would eventually cripple the Metrics JVMs as they would have to maintain that many more connections.

A better approach

Flite strongly believes in Agile, and what better way to be agile than to fail fast? After a day of our first stab running in production, we realized that we didn’t quite get it right. So we analyzed what was going on and realized that the sheer number of connections was overwhelming Redis. Obviously our sharding mechanism needed work.

Based on our write-heavy access patterns, we concluded that the best way forward was to focus on making writes efficient, even if it makes reads far more expensive. The simplest way to do that is to make sure that each metrics request connected to one and only one Redis. This carries the complication of making the Node and Python code connect to all 4 Redis nodes in order to assemble the data. Counters that previously lived on one node now had to be read from 4 and summed, and lists concatenated and resorted.

Finally, some code!

Java

The basic structure we used is as follows:

RedisNodeManager – Spring-instantiated bean that takes node configuration information on startup and sets up a list of JedisPoolManagers. This bean also returns a RedisPipelineManager.

JedisPoolManager – Contains information necessary to manage one JedisPool. This includes connection info for one Redis node, as well as code to get a PoolAwareJedis client.

PoolAwareJedis – The Jedis 2.0.0 implementation we use does not carry information about which pool this specific Jedis connection came from. Since connections need to be released by calling pool.returnResource, a wrapper class was necessary. This class simply contains a reference to one Jedis and one JedisPool.

RedisPipelineManager – An instance of this class is created for every session of writing to Redis. It is created with a List. It is responsible for handing out standard Jedis Pipelines. Which Pipeline it returns is based on a shardKey, and Pipelines are only created on demand. While this places the burden on the consuming code to make sure that only one shardKey is sent in per session, it is more flexible in adapting to different sharding strategies. This class is also responsible for syncing all of the pipelines and releasing the connections when done.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Pipeline getPipeline(String shardKey) {
    return getPipeline(getShardNumber(shardKey, jedisPools.size()));
}

public Pipeline getPipeline(int shardNbr) {
    if (pipelines.get(shardNbr) == null) {
        try {
            jedises.set(shardNbr, jedisPools.get(shardNbr).getClient());
            pipelines.set(shardNbr, jedises.get(shardNbr).getJedis().pipelined());
        } catch (Exception e) {
            LOG.warn("Error getting Redis Client # " + shardNbr + ": " + e.getMessage());
            LOG.debug("Error getting Redis Client # " + shardNbr, e);
            throw new RuntimeException(e);
        }
    }
    return pipelines.get(shardNbr);
}

static public Integer getShardNumber(String key, Integer numberOfShards) {
    //return number from 0 to jedisPools.size()-1
    return Math.abs(key.hashCode()) % numberOfShards;
}

Node.js

One of the main challenge on the JavaScript side is to find a hash function which returns the same value both in Java and JavaScript for a given key. Apparently a lot of hash functions in Java return a long which can range from -263 to 263 -1, but the max number in JavaScript is 253. So we can’t really use a hash function which returns a long in Java. After a bit of fiddling around we found a JavaScript implementation of the java.lang.String.hashcode() method which returns a 32 bit signed integer.

1
2
3
4
5
6
7
8
9
10
11
/**
* Equivalent java.lang.String.hashcode() method
* retruns 32 bit signed integer
* Refer: http://cwestblog.com/2011/10/11/javascript-snippet-string-prototype-hashcode/
*/
String.prototype.hashCode = function() {
  for(var ret = 0, i = 0, len = this.length; i < len; i++) {
    ret = (31 * ret + this.charCodeAt(i)) << 0;
  }
  return ret;
};

As mentioned earlier, the Node.js app and Python code should fan out each request to all the Redis hosts and assemble the response before returning data. Most of our read use cases read the bulk of data for a given Redis command, so each of these requests are piplelined(Multi).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
FanOutRedisPipeline.prototype.getData = function(keys, cmd, options, callback) {
    if (!callback) {
        logger.error("callback can't be null. cmd:" + cmd);
        return;
    }
    this.callback = callback;
    var options = options || [];
    _.each(this.pipelines, function(pipeline, pidx) {
        _.each(keys, function(key) {
            var cmdParams = _.clone(options);
            cmdParams.unshift(key);
            pipeline[cmd](cmdParams);
        });

        pipeline['exec'](_.bind(this.onPipelineResponse, this));
    }, this);
}

FanOutRedisPipeline.prototype.onPipelineResponse = function(err, res) {
    if (err) {
        logger.error("Exec is not executed");
        res = [];
    }
    this.replies.push(res);
    --this.currentRedisClientCount;

    if (this.currentRedisClientCount === 0) {
        var grouped = [], result = [];

        for (var i = 0; i < _.first(this.replies).length; i++) {
            grouped = _.pluck(this.replies, i);
            result.push(constructResponse(grouped));
        }
        this.callback(err, result);
    }

    function constructResponse(groupedRes) {
        .....
        .....

        return mergedRes;
    }
}

One last gotcha

After deploying the above sharding code, we discovered that Redis was still not completely healthy. Even though we were running two instances of the single-threaded Redis on each host, we were only seeing 140% CPU usage, out of a possible 400%. It turns out that both instances of Redis were being bound to the same CPU core! A quick addition to the Redis startup script fixed that right up:

1
2
taskset -c -p 0 `cat /var/run/redis/redis1.pid`
taskset -c -p 3 `cat /var/run/redis/redis2.pid`

Now each Redis instance is bound to a different CPU core and all is well in Redisland.

Lessons learned

Scalability is tricky. Sharding correctly is tricky. Determining the true causes of scalability failures is tricky. It took us several releases to diagnose what was really making Redis fall down. Partially this was because we over-relied on Redis’s advertised benchmarks, which turned out to not be accurate with our usage patterns.

Another important lesson is to pay more attention to read/write patterns. This system writes hundreds of thousands of events every minute, while only a hundred users may be monitoring their ads realtime performance at any given time. Simplifying the write side as much as possible, at the expense of complicating the read is the right call in this situation.

Theory and practice are equivalent only in theory.