behind the gist

thoughts on code, analysis and craft

Circular buffers in Redis

I’ve pointed out before how I publish real-time location data to a browser. One of the context items I provide is “tails” to show the recent location history of the detected devices. In that case, the context was on the order of a minute or so to provide a sense of motion and activity. Another important aspect of context is to be able to see recent patterns of behavior. In this case, we need more context, on the order of an hour or more, but that can be a large amount of data. To handle this case efficiently, I set up a circular buffer representing a fixed time window in Redis. This way, clients can grab the whole context as needed (for example, at startup) or grab a portion to catch up since the last time the view was open. On the JavaScript side, we use a DataView to minimize the transmission size.

Like I’ve said before, I really like Redis. One of the things Redis lets you do is manipulate “strings”, although strings can hold any binary data. Redis provides commands to perform bit and byte operations on them. It is pretty straight forward to implement a circular array using these commands. But for my use case, I want the array to represent a fixed time duration, not a fixed size. We’ll need a separate index to map time ranges to byte ranges. One of the great things about Redis is that it provides just the structure we need to build such an index, a sorted set.

We are going to be storing a simple record in the buffer, consisting of a timestamp, an identifier and coordinates.

record
1
2
3
4
5
def record(millis, id, x, y)
  # javascript DataView doesn't support unsigned longs so use doubles for them
  format = 'GGgg' # big-endian double, double, float, float
  [millis.to_i, id.to_i, x.to_f, y.to_f].pack(format)
end

When we add a record to the buffer, we save the record index in the sorted set using the timestamp as the “score”.

append
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def append(millis, id, x, y)
  record_index = append_record(record(millis, id, x, y))
  redis.zadd(time_index_key, millis.to_i, record_index)
end

def append_record(record)
  next_index = incr_record_index
  offset = next_index * record_length
  redis.setrange(history_key, offset, record)
  next_index
end

def incr_record_index
  redis.incr(counter_key).try {|i| i.to_i % capacity}
end

Note that we are using three different keys in Redis, the circular buffer (history_key), the time index (time_index_key) and a record counter (counter_key). The “circular” part of the circular buffer comes from the mod we do in incr_record_index. Once we determine the index, we write the record at the appropriate offset and store the index with the appropriate timestamp.

Retrieval of a particular time range is straightforward as well.

retrieve history
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def history_since(millis)
  current_index = current_record_index
  start_index = record_index_at(millis)

  return "" if current_index.nil? || start_index.nil?

  first_char = start_index * record_length
  last_char = (current_index + 1) * record_length - 1

  get_circular_range(first_char, last_char)
end

def get_circular_range(first_char, last_char)
  if first_char > last_char
    redis.getrange(history_key, first_char, -1) + redis.getrange(history_key, 0, last_char)
  else
    redis.getrange(history_key, first_char, last_char)
  end
end

def record_index_at(millis)
  redis.zrangebyscore(time_index_key, millis, 'inf', limit: [0, 1]).first.try(:to_i)
end

def current_record_index
  redis.get(counter_key).try {|i| i.to_i % capacity}
end

Again, we use all three keys. The counter key tells us the current index. We use the time index to figure out which index occurs on or after the desired timestamp. And finally we retrieve the records from the history key.

On the JavaScript side, we need to unpack the records we’ve been sent and we do this using a DataView.

unpack
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
function update_history(history) {
  var buf = base64_to_array_buffer(history);
  var view = new DataView(buf);
  var offset = 0;
  for (var i = 0; offset < buf.byteLength; ++i) {
    var time = view.getFloat64(offset);
    offset += 8;
    var id = "" + view.getFloat64(offset);
    offset += 8;
    var x = view.getFloat32(offset);
    offset += 4;
    var y = view.getFloat32(offset);
    offset += 4;

    var d = devices.get(id);
    if (!d) {
      d = new Device(id);
      devices.set(id,d);
    }
    d.update_history_location({time: time, x: x, y: y});
  }
}

Extracting the fields is a little tedious since there isn’t a stream-like interface to a DataView. We need to maintain the bookkeeping of which byte to read ourselves. Also note that the server is encoding the history in Base64 and we therefore need to decode it. The core part of the decoding shows how we create the ArrayBuffer.

simplified Base64 decoding
1
2
3
4
5
6
7
8
9
// assume correct padding, e.g. length is exact multiple of 4
var chunks = encoded_history.length >> 2;
var bytes = 3 * chunks;
var buf = new ArrayBuffer(bytes);
var byte_buf = new Uint8Array(buf);

for (var chunk = 0; chunk < chunks; ++chunk) {
  byte_buf.set(bytesAtBase64Chunk(encoded_history,chunk), chunk*3);
}

We create a buffer of the appropriate length (three decoded bytes for every four encoded characters), and then wrap it in a Uint8Array to fill it chunk-by-chunk.

The full gist is available here.