An ORM-ish Layer for RocksDB

RocksDB is a super convenient on-disk sorted map data store. All the keys are written in sorted order so if you can model your lookup patterns into binary search style lookups and range scans, which most use cases can, then it works really well. The annoying part is that the default lexicographical sorting doesn’t work for non string data types. If you put user id 10 and 2 as strings, and then do a range scan for all users with ids less than 5, both will show up because string “10” < string “2”, which breaks the range scan model. The other thing is that it is difficult to store something more domain specific like a ‘User’ object both from an object pov and what the ‘key’ should be for a ‘User’.

Attempt 1 - Serializer + Comparator

I didn’t have the final interface that I wanted just yet but I my first idea was that I need 2 pieces to dump something in rocksdb. One is a serializer, which converts the thing into binary and a comparator of some sorts, which would give the proper sorting semantics. That looked something like

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
interface BinarySerializer<T> {
fun serialize(v: T): ByteArray
fun deserialize(bytes: ByteArray): T
}

class KotlinComparator<K : Comparable<K>>(private val ser: BinarySerializer<K>, opts: ComparatorOptions = ComparatorOptions()) : AbstractComparator(opts) {
override fun name() = "kotlin-cmp-${ser::class.simpleName}"
override fun compare(a: Slice, b: Slice): Int {
val ka = ser.deserialize(a.data())
val kb = ser.deserialize(b.data())
return ka.compareTo(kb)
}
}

//... ommitted code

And then the usage would look something like

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
object StringSer : BinarySerializer<String> {
override fun serialize(v: String) = v.toByteArray(Charsets.UTF_8)
override fun deserialize(b: ByteArray) = String(b, Charsets.UTF_8)
}

data class Person(val id: Int, val name: String) : Comparable<Person> {
override fun compareTo(other: Person) = id.compareTo(other.id)
}

object PersonSer : BinarySerializer<Person> {
override fun serialize(v: Person) = "${v.id}|${v.name}".toByteArray()
override fun deserialize(b: ByteArray): Person {
val s = String(b); val p = s.indexOf('|')
return Person(s.substring(0,p).toInt(), s.substring(p+1))
}
}

// string -> Person data class sorted map using rocksdb
db.openTree("people_idx", StringSer, PersonSer)

This is fine for trying something out but as usage ramps up it causes issues since this will deserialize everytime RocksDB compares two keys, such as compaction or iterator seek. What would be more preferred would be to move the work from compare time to write time. You pay once to encode the key correctly in ‘direct sorted bytes’ when you put(), and then every future compare is free.

Attempt 2 - universal byte encoder

By this point I was starting to see the final vision of what I wanted. It was to basically be a ‘map’ of 2 data classes one for the key and one for the value, which would read like a regular jvm treemap but would be backed by rocksdb on disk.

So then I thought what if I could have a serializer that knows what the various key is or various data types that make the key if it is a compound key and output the bytes in a way that can maintain sort order. That led to the ordered codec class.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
interface Field<T> {
fun write(buf: ByteBuffer, value: T)
fun read(buf: ByteBuffer): Any
}

class OrderedCodec<T : Any>(
private val ctor: (List<Any>) -> T,
private val fields: List<Field<T>>
) : BinarySerializer<T> {

override fun serialize(v: T): ByteArray {
// rough size estimate — ByteBuffer grows if needed
val buf = ByteBuffer.allocate(256).order(ByteOrder.BIG_ENDIAN)
fields.forEach { it.write(buf, v) }
return buf.array().copyOf(buf.position())
}

override fun deserialize(bytes: ByteArray): T {
val buf = ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN)
val values = fields.map { it.read(buf) }
return ctor(values)
}
}

// DSL helpers
class CodecBuilder<T : Any> {
private val fields = mutableListOf<Field<T>>()

fun string(get: (T) -> String) = fields.add(object : Field<T> {
override fun write(buf: ByteBuffer, v: T) {
val b = get(v).toByteArray(Charsets.UTF_8)
buf.putShort(b.size.toShort())
buf.put(b)
}
override fun read(buf: ByteBuffer): Any {
val len = buf.short.toInt() and 0xffff
val b = ByteArray(len); buf.get(b)
return String(b, Charsets.UTF_8)
}
})

fun int(get: (T) -> Int) = fields.add(object : Field<T> {
override fun write(buf: ByteBuffer, v: T) {
buf.putInt(get(v) xor Int.MIN_VALUE) // flip sign bit
}
override fun read(buf: ByteBuffer) = buf.int xor Int.MIN_VALUE
})

fun long(get: (T) -> Long) = fields.add(object : Field<T> {
override fun write(buf: ByteBuffer, v: T) {
buf.putLong(get(v) xor Long.MIN_VALUE)
}
override fun read(buf: ByteBuffer) = buf.long xor Long.MIN_VALUE
})

fun build(ctor: (List<Any>) -> T) = OrderedCodec(ctor, fields)
}

inline fun <reified T : Any> orderedCodec(
noinline ctor: (List<Any>) -> T,
block: CodecBuilder<T>.() -> Unit
): BinarySerializer<T> = CodecBuilder<T>().apply(block).build(ctor)

Then to create a key that sorts by the user email first, then ID, and then the timestamp the ordered codec can be created as follows

1
2
3
4
5
6
7
8
9
data class UserKey(val email: String, val id: Int, val ts: Long)

val userKeySer = orderedCodec<UserKey>(
ctor = { (e, i, t) -> UserKey(e as String, i as Int, t as Long) }
) {
string { it.email } // 1st sort field
int { it.id } // 2nd
long { it.ts } // 3rd
}

This greatly reduces object allocations in the critical path.

Attempt 3 - auto codec

So if an ordered codec can be made by passing the fields in; can the codec be created by inspecting the class directly? This helps avoid having to write unneccesary boiler plate transformer code.

With the help of reflection we can walk the constructor and emit the right writers/readers for each type.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import kotlin.reflect.KClass
import kotlin.reflect.full.memberProperties
import kotlin.reflect.full.primaryConstructor

inline fun <reified T:Any> autoCodec(): BinarySerializer<T> = autoCodec(T::class)

fun <T:Any> autoCodec(kClass: KClass<T>): BinarySerializer<T> {
val ctor = kClass.primaryConstructor
?: error("$kClass has no primary constructor")

val fields = ctor.parameters.map { param ->
val prop = kClass.memberProperties.first { it.name == param.name }
as kotlin.reflect.KProperty1<T, *>

val type = prop.returnType.classifier
object : Field<T> {
override fun write(out: DataOutput, v: T) {
when(type) {
String::class -> {
val b = (prop.get(v) as String).toByteArray()
out.writeInt(b.size); out.write(b)
}
Int::class -> out.writeInt((prop.get(v) as Int) xor Int.MIN_VALUE)
Long::class -> out.writeLong((prop.get(v) as Long) xor Long.MIN_VALUE)
Boolean::class -> out.writeByte(if(prop.get(v) as Boolean) 1 else 0)
Float::class -> {
val bits = java.lang.Float.floatToRawIntBits(prop.get(v) as Float)
out.writeInt(bits xor ((bits shr 31) and Int.MAX_VALUE) xor Int.MIN_VALUE)
}
Double::class -> {
val bits = java.lang.Double.doubleToRawLongBits(prop.get(v) as Double)
out.writeLong(bits xor ((bits shr 63) and Long.MAX_VALUE) xor Long.MIN_VALUE)
}
Instant::class -> {
val i = prop.get(v) as Instant
out.writeLong(i.epochSecond xor Long.MIN_VALUE); out.writeInt(i.nano)
}
else -> error("autoCodec: unsupported type $type for ${prop.name}")
}
}
override fun read(inp: DataInput): Any = when(type) {
String::class -> String(ByteArray(inp.readInt()).also{inp.readFully(it)})
Int::class -> inp.readInt() xor Int.MIN_VALUE
Long::class -> inp.readLong() xor Long.MIN_VALUE
Boolean::class -> inp.readByte().toInt()!=0
Float::class -> {
val s = inp.readInt(); val b2 = s xor Int.MIN_VALUE
java.lang.Float.intBitsToFloat(b2 xor ((b2 shr 31) and Int.MAX_VALUE))
}
Double::class -> {
val s = inp.readLong(); val b2 = s xor Long.MIN_VALUE
java.lang.Double.longBitsToDouble(b2 xor ((b2 shr 63) and Long.MAX_VALUE))
}
Instant::class -> Instant.ofEpochSecond(inp.readLong() xor Long.MIN_VALUE, inp.readInt().toLong())
else -> error("unsupported")
}
}
}
return TypedCodec(ctor, fields)
}

Then the usage to index bugs by priorty against bug reports we can create a sorted map table like

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
data class BugReport(
val title: String,
val priority: Int,
val created: Instant,
val id: String = java.util.UUID.randomUUID().toString()
)

data class BugReportByPriorityIndex(val priority: Int, val created: Instant)

fun main() {
val db = RocksDbDatabase(Path.of("./data"))

// no manual field list
val bugPK = autoCodec<BugReportByPriorityIndex>()
val bugData = autoCodec<BugReport>()

val bugs = db.openTree("bugs", bugPK, bugData)
// ... same puts and scans work
}

Attempt 4 - varying the sort order

When I used the above DB I noticed that if two bug objects have the same priority they are then sorted in ascending order of created timestamp i.e. the one creater earlier is shown first but what I wanted was that the most recent one would show up first i.e. highest priority [1] with the most recently submitted show up at the top of the list. Now I wanted the ability to specify varying sorting of the different components that made up the key. That led to a new @Desc annotation and a few tweaks in the codec that would write the inverse of the bytes, which would cause it to sort in the opposite order.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Target(AnnotationTarget.PROPERTY)
@Retention(AnnotationRetention.RUNTIME)
annotation class Desc

// in TypedBuilder
fun int(prop: KProperty1<T,Int>, desc:Boolean=false) = fields.add(object:Field<T>{
override fun write(o:DataOutput,v:T){
val enc = prop.get(v) xor Int.MIN_VALUE
o.writeInt(if(desc) enc.inv() else enc)
}
override fun read(i:DataInput)= (if(desc) i.readInt().inv() else i.readInt()) xor Int.MIN_VALUE
})
fun intDesc(p:KProperty1<T,Int>) = int(p,true)

fun long(p:KProperty1<T,Long>,desc:Boolean=false)=fields.add(object:Field<T>{
override fun write(o:DataOutput,v:T){val e=p.get(v) xor Long.MIN_VALUE; o.writeLong(if(desc)e.inv()else e)}
override fun read(i:DataInput)=(if(desc)i.readLong().inv()else i.readLong()) xor Long.MIN_VALUE
})
fun longDesc(p:KProperty1<T,Long>)=long(p,true)

fun instant(p:KProperty1<T,Instant>,desc:Boolean=false)=fields.add(object:Field<T>{
override fun write(o:DataOutput,v:T){
val i=p.get(v); var s=i.epochSecond xor Long.MIN_VALUE; var n=i.nano
if(desc){ s=s.inv(); n=n.inv() }
o.writeLong(s); o.writeInt(n)
}
override fun read(i:DataInput):Any{
var s=i.readLong(); var n=i.readInt()
if(desc){ s=s.inv(); n=n.inv() }
return Instant.ofEpochSecond(s xor Long.MIN_VALUE, (n and 0xffffffffL))
}
})
fun instantDesc(p:KProperty1<T,Instant>)=instant(p,true)

Now the index key becomes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
data class BugReportByPriorityIndex(
val priority: Int, // <-- ascending
@Desc val created: Instant // <-- descending
)

fun main() {
val db = RocksDbDatabase(Path.of("./data"))
val bugPK = autoCodec<BugReportByPriorityIndex>()
val v1 = autoCodec<BugReport>()
val bugsDB: TreeTable<BugReportByPriorityIndex, BugReport> = db.openTree("bugs_by_priority", bugPK, v1)
bugsDB.put(BugReportByPriorityIndex(1, Instant.now()), BugReport("jvm getting npe", 1, Instant.now()))
bugsDB.put(BugReportByPriorityIndex(1, Instant.now()), BugReport("highest priorty", 1, Instant.now()))
println(bugsDB.first())
println(bugsDB.last())
db.close()
}

Attempt 5 - versioning and schema updates

As the app evolves so will the data classes schemas with it. How would that be accommodated here? The easiest is to tack on a version byte.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Versioned<V:Any>(
private val current: Int,
private val codecs: Map<Int, BinarySerializer<V>>,
private val migrate: (Any) -> V
) : BinarySerializer<V> {
override fun serialize(v: V): ByteArray {
val body = codecs[current]!!.serialize(v)
return byteArrayOf(current.toByte()) + body
}
override fun deserialize(b: ByteArray): V {
val ver = b[0].toInt()
val body = b.copyOfRange(1, b.size)
val old = codecs[ver]!!.deserialize(body)
return if (ver == current) old as V else migrate(old)
}
}

Then the data class definition becomes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// v1
data class BugReportV1(val title: String, val priority: Int)
val v1 = autoCodec<BugReportV1>()

// v2 adds created
data class BugReportV2(val title: String, val priority: Int, val created: Instant)
val v2 = autoCodec<BugReportV2>()

val bugData = Versioned(
current = 2,
codecs = mapOf(1 to v1 as BinarySerializer<Any>, 2 to v2 as BinarySerializer<Any>),
migrate = { old ->
when(old) {
is BugReport -> BugReport2(old.title, old.priority, Instant.EPOCH)
else -> old as BugReport2
}
}
)

Usage is something like

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
fun main() {
val db = RocksDbDatabase(Path.of("./data"))
val bugPK = autoCodec<BugReportByPriorityIndex>()
val v1 = autoCodec<BugReport>()

val bugSchema = Versioned(
current = 1,
codecs = mapOf(1 to v1),
migrate = { it as BugReport }
)

val bugsDB = db.openTree("bugs_by_priority", bugPK, bugSchema)
println(bugsDB.first())
db.close()
}

// add v2
val v2 = autoCodec<BugReportV2>()
val bugSchema = Versioned(
current = 2,
codecs = mapOf(1 to v1, 2 to v2),
migrate = { old ->
when(old) {
is BugReport -> BugReportV2(old.title, old.priority, old.created, "n/a")
else -> old as BugReportV2
}
}
)

fun main() {
val db = RocksDbDatabase(Path.of("./data"))
val bugPK = autoCodec<BugReportByPriorityIndex>()

val v1 = autoCodec<BugReport>() // old
val v2 = autoCodec<BugReport2>() // new

val bugSchema = Versioned(
current = 2, // write new objects as v2
codecs = mapOf(1 to v1, 2 to v2),
migrate = { old ->
when (old) {
is BugReport -> BugReport2(old.priority, old.created, old.id)
else -> old as BugReport2
}
}
)

val bugsDB = db.openTree("bugs_by_priority", bugPK, bugSchema)

val key = BugReportByPriorityIndex(1, Instant.now())
bugsDB.put(key, BugReport2(1, Instant.now(), 42L))

// read: version byte is checked, if it were 0x01 migrate runs
println(bugsDB.first())

db.close()
}

At this point I decided to just use sqlite instead :)

The full codec file can be found here