RPC over Websockets [or TCP]

Introduction

I write a lot of server side code. A common theme when writing server side code is that there is usually a client that connects over a network and sends messages/requests to communicate with the server.

I want a network protocol that allows for a RPCs [sum(a,b)] instead of rest style resource [/sum?a&b] requests. I want to have full duplex communication as well as single request-response semantics. I want to have better performance by pipelining all network messages over a single connection. Http and gRPC are great but I think there is a middle ground that is underrated and that is rpc over websocket. I tend not to see a lot of apps written using rpc over websocket - probably due to the bad dev ergonomics when building the client and server layers. The very first thing that is usually missing is how do I do a request/response style workload which is the most common. Websockets are stream based - messages are sent but there is no semantic built in that provides a ‘response’ back for the message that was just sent. We have to write that part. Http has the lead here because it is extremely dev ergonomic on the request response paradigm.

The post will describe the various components I coded up that allows me to build rpc-ws based backends [and clients]. It supports all the above mentioned requirements as well as seamlessly blending the messages that are coming over the wire/network into kotlin native coroutine primitives such as Flows.

It would be really awesome if the client server can be modelled as kotlin native flows on both sides with a websocket/tcp pipe seamlessly flowing the data between them.

Since Websockets and Tcp sockets are both WriteStreams<Buffer> in Vertx - the underlying stream network layer can be either, while keeping the rest of the semantics all the same. Benefits of having good interface design

Serialization

Unlike grpc, the apis are not strongly typed. The client and server both receive json in their handlers, which is a stripped down version of the json that came over the wire i.e. the serialization protocol is json and transport is websockets. No byte encoded messages like protobuf or cap’n’rover.

It is difficult to get strongly typed apis without writing a template engine or a compiler and I didn’t want a ‘generator’ style library. I wanted real functions with real intellisense support. The client can convert the json to a pojo easily using the toJson and fromJson convenience methods provided

Protocol function signatures

There are 4 types of communication I wanted to support.

  1. Request-Response
  2. Server to client stream
  3. Client to server stream
  4. Server-Client Bidirectional stream

The above patterns show how heavily inspired this library is from grpc. That is because I spent years writing grpc services professionally and saw first hand what worked and what didn’t.

The communication styles are mapped into the following kotlin function signatures. The server or the service being exposed needs to register handlers like these.

1
2
3
4
5
6
7
// request response
fun rpcHandler(request: JsonObject): JsonObject
// input is a flow that sends messages from the client in - the output is a single json that is sent to the client
fun clientStreamHandler(requests: Flow<JsonObject>): JsonObject
// input is the message from the client - all messages sent on the returning/output flow will be streamed to the client
fun serverStreamHandler(requests: JsonObject): Flow<JsonObject>
fun bidiStreamHandler(requests: Flow<JsonObject>): Flow<JsonObject>

Sha service server side

An example Sha service [returns the Sha256 of the input string] exposed would look something like

The service has the actual business logic and the handlers that connect it to the rpc server using the function signatures shown above

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import io.vertx.core.json.JsonObject
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import java.security.MessageDigest

class ShaService {
data class ShaRequest(val data: String)
data class ShaResponse(val sha: String)

fun sha(shaRequest: ShaRequest): ShaResponse {
return ShaResponse(MessageDigest.getInstance("SHA-256").digest(shaRequest.data.encodeToByteArray()).toHexString())
}

private fun ByteArray.toHexString() = joinToString("") { "%02x".format(it) }
}

class ShaRpcHandlers(
private val shaService: ShaService = ShaService(), private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
) {
suspend fun shaHandler(request: JsonObject): JsonObject {
return shaService.sha(fromJson(request)).toJson()
}

suspend fun shaClientStreamHandler(requests: Flow<JsonObject>): JsonObject {
val stringBuilder = StringBuilder()
requests
.map { fromJson<ShaService.ShaRequest>(it) } // <-- input messages coming in over the network from the client
.onEach { stringBuilder.append(it.data) } // <--- do business logic
.catch { println("sha service flow had error ${it.message}") }
.collect()
// return is sent to the client
return shaService.sha(ShaService.ShaRequest(stringBuilder.toString())).toJson()
}

suspend fun shaServerStreamHandler(request: JsonObject): Flow<JsonObject> {
val clientShaRequest = fromJson<ShaService.ShaRequest>(request)
val shaResponse = shaService.sha(clientShaRequest)
// messages sent over the flow are streamed to the client over the network
val serverStreamFlow = flow { repeat(5) { emit(ShaService.ShaResponse("server streamed $it - ${shaResponse.sha}").toJson()) } }
serverStreamFlow.launchIn(scope)
return serverStreamFlow
}

suspend fun shaBidiStreamHandler(requests: Flow<JsonObject>): Flow<JsonObject> {
val serverStreamChannel = Channel<JsonObject>()

requests.map { fromJson<ShaService.ShaRequest>(it) }
.onEach { serverStreamChannel.send(ShaService.ShaResponse("bidi server streamed $it - ${shaService.sha(it)}").toJson()) }
.catch { it.printStackTrace() }
.onCompletion { serverStreamChannel.close() }
.launchIn(scope)

return serverStreamChannel.receiveAsFlow()
}
}

The service handlers are bound to a registry and a server is started as follows

1
2
3
4
5
6
7
8
9
val shaRpcHandlers = ShaRpcHandlers()
val serviceRegistry = ServiceRegistry()
serviceRegistry.registerUnaryHandler("sha", shaRpcHandlers::shaHandler)
serviceRegistry.registerClientStreamHandler("sha", shaRpcHandlers::shaClientStreamHandler)
serviceRegistry.registerServerStreamHandler("sha", shaRpcHandlers::shaServerStreamHandler)
serviceRegistry.registerBidiStreamHandler("sha", shaRpcHandlers::shaBidiStreamHandler)

val rpcServer = RpcServer(serviceRegistry)
rpcServer.startWsServer()

Whenever the client makes a ‘sha’ request the appropriate method is called i.e. the same rpc name can be shared across different communication styles.

Request response [client side]

The most important part of the project was to support request-response paradigm code flow. That is the next line of the code is only triggered when the response of the request is received. This is not a supported structure in most websocket/tcp libraries as they are stream/message oriented and there is no concept of a ‘response’ so there is no way to ‘wait’ before going to the next line.

The client has a function like fun makeRpc(rpcName: String, requestBody: JsonObject): Result<JsonObject> to make unary rpc requests

What it would look from the client side is

1
2
3
4
5
val rpcClient = RpcClient().connectWs() // or connectTcp()
val unaryResponse = rpcClient.makeRpc("sha", ShaService.ShaRequest("hello").toJson())
assert(unaryResponse.isSuccess)
assert(unaryResponse.getOrThrow().containsKey("sha"))
assert(unaryResponse.getOrThrow().getString("sha").equals("2CF24DBA5FB0A30E26E83B2AC5B9E29E1B161E5C1FA7425E73043362938B9824"))

the code only moves onto the next line once the response is received even though the message was sent over websocket [or tcp].

The key idea behind getting the code to wait for a response is that receiving a message on an empty channel or flow is a suspending call. Suspending means that the code will ‘wait’ on that line, but doesn’t block the underlying thread, until the call completes before moving on to the next statement. It can be thought of as a syntactic sugar over a callback.

We can send a message over the socket to the server with an ID, and place a channel/flow in a hashmap with that ID, and then immediately suspend on receiving from that flow. The websocket server incoming message handler, when receives a message, it checks if there is a flow in the hashmap for that ID - if yes then it writes the incoming message to that channel which in turn un-suspends the original method call and the client gets the response in non-blocking but synchronous manner.

The client snippet looks like this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
suspend fun makeRpc(rpcName: String, requestBody: JsonObject): Result<JsonObject> {
// the rpc header has an autogen ID
val rpcHeader = RpcHeader(rpcName)
// this is the channel that the incoming message with the same ID will be written to
val receive = Channel<Result<JsonObject>>()
// add the channel to a hashmap
activeCalls[rpcHeader.requestID] = ActiveRequest(receive)
// send the request to the server with the ID in the header
socket.write(createNetworkMessage(rpcHeader, requestBody).toBuffer().appendString("\n")).onFailure { it.printStackTrace() }.await()
// client suspends here until the response comes back - optionally can add a timeout using withTimeout
return receive.consumeAsFlow()
.onCompletion { activeCalls.remove(rpcHeader.requestID) } // when message received remove it from the hashmap
.catch { logger.error("received error when getting response for unary rpc", it) }
.first() // returns a Result which can either be Success<JsonObject> or Failure<Throwable>
}

Incoming messages from the server are dispatched out to the appropriate flows based on the IDs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val webSocket = vertx.createHttpClient().webSocket(port, host, path).await()
webSocket.binaryMessageHandler { handleServerMessage(it.toString()) }

suspend fun handleServerMessage(serverMsg: String) {
val header = receivedMsg.getJsonObject("header").mapTo(RpcHeader::class.java)
when (val activeCall = activeCalls[header.requestID]) {
is ActiveRequest -> forwardMessageToActiveCall(activeCall, header.isErr, receivedMsg.getJsonObject("payload"))
is ActiveStream -> forwardMessageToActiveStream(activeCall, header, receivedMsg.getJsonObject("payload"))
null -> logger.warn("received requested id ${header.requestID} was not found in active calls")
}
}

suspend fun forwardMessageToActiveCall(activeRequest: ActiveRequest, isErr: Boolean, payload: JsonObject) {
val result = if (isErr) Result.failure(fromJson(payload)) else Result.success(payload)
activeRequest.channel.send(result)
activeRequest.channel.close()
}

Server stream [client side]

A request-response cycle can be thought of a server stream with only 1 message. So if we drop that then we get the code for server stream requests - where multiple messages can be received.

1
2
3
4
5
6
7
8
9
10
suspend fun makeServerStreamRpc(rpcName: String, requestBody: JsonObject): Flow<JsonObject> {
val rpcHeader = RpcHeader(rpcName, MethodType.SERVER_STREAM)
val receive = Channel<JsonObject>()
activeCalls[rpcHeader.requestID] = ActiveStream(receive)
socket.write(createNetworkMessage(rpcHeader, requestBody).toBuffer().appendString("\n")).await()
// this flow will forward the messages from the server to the client code using forwardMessageToActiveStream
return receive.receiveAsFlow()
.onCompletion { activeCalls.remove(rpcHeader.requestID) }
.catch { logger.error("received error when passing messages from server ws and sending to client flow", it) }
}

Bidi Stream [client]

A bidirectional stream is when a client and server are both asynchronously sending messages to each other. A flow is taken in by the client method that will be streamed out to the server and a flow is returned from the client method which will receive messages that the server is sending for this rpc.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
suspend fun makeBidiStreamRpc(rpcName: String, messages: Flow<JsonObject>): Flow<JsonObject> {
val rpcHeader = RpcHeader(rpcName = rpcName, MethodType.BIDI_STREAM)
val receive = Channel<JsonObject>()
activeCalls[rpcHeader.requestID] = ActiveStream(receive)
// messages on this flow come from the client and are to be streamed out to the server
messages.onEach {
socket.write(createNetworkMessage(rpcHeader, it).toBuffer().appendString("\n")).await()
}.onCompletion {
socket.write(createNetworkMessage(rpcHeader.copy(end = true)).toBuffer().appendString("\n")).await()
}.catch {
logger.error("received error when taking in messages from client from and sending to server over ws", it)
}.launchIn(dispatcher) // asynchronously take in messages that the client is sending
// the returning flow receives messages that are coming from the server
return receive.receiveAsFlow().onCompletion { activeCalls.remove(rpcHeader.requestID) }
}

Running handlers server side

Since service handlers are regular functions and functions are first class citizens in kotlin we can easily connect messages coming in from the client over the network to invoke appropriate handlers.

The handlers are kept in a hashmap, which is the service registry

1
2
3
4
val clientStreamHandlers = HashMap<String, suspend (Flow<JsonObject>) -> JsonObject>()
val serverStreamHandlers = HashMap<String, suspend (JsonObject) -> Flow<JsonObject>>()
val bidiStreamHandlers = HashMap<String, suspend (Flow<JsonObject>) -> Flow<JsonObject>>()
val unaryHandlers = HashMap<String, suspend (JsonObject) -> JsonObject>()

When rpc request comes in on the server then the handler is called

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
suspend fun triggerUnaryRpc(rpcName: String, payload: JsonObject): JsonObject {
val unaryHandler = serviceRegistry.getUnaryHandler(rpcName) ?: return JsonObject()
return unaryHandler.invoke(payload)
}

suspend fun handleServerStreamRpcMsg(header: RpcHeader, msg: JsonObject, socket: WriteStream<Buffer>) {
serviceHandler.triggerServerStreamRpc(header.rpcName, msg.getJsonObject("payload", JsonObject()))
?.onEach { forwardFlowMessageToSocket(header, it, socket) } // forward message to client
?.onCompletion { writeFinishMessageToSocket(header, socket) }
?.launchIn(vertxScope)
}

suspend fun triggerServerStreamRpc(rpcName: String, payload: JsonObject): Flow<JsonObject>? {
val handler = serviceRegistry.getServerStreamHandler(rpcName) ?: return null
// messages coming from this flow are streamed to the client
return handler.invoke(payload)
}

For bidi stream messages there is a bit more book keeping work to do but nothing too complicated

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
suspend fun handleBidiStreamMsg(header: RpcHeader, msg: JsonObject, socket: WriteStream<Buffer>) {
serviceHandler.triggerBidiStreamRpc(header.rpcName, header.requestID, header.end, msg.getJsonObject("payload", JsonObject()))
?.onEach { forwardFlowMessageToSocket(header, it, socket) } // forward message to client
?.onCompletion { writeFinishMessageToSocket(header, socket) }
?.launchIn(vertxScope)
}

suspend fun triggerBidiStreamRpc(rpcName: String, id: String, endStream: Boolean, isErr: Boolean, msg: JsonObject): Flow<JsonObject>? {
if (activeClientStreams.containsKey(id)) {
sendMessageToActiveFlowStream(id, isErr, msg) // message from client sent to local server side flow
return null
}

val channel = Channel<JsonObject>(32)
channel.send(msg)
activeClientStreams.putIfAbsent(id, channel)
val handler = serviceRegistry.getBidiStreamHandler(rpcName) ?: return null
// the handler is invoked with the flow that is receiving messages from the client
// it returns a flow whose messages will be streamed out to the client
return handler.invoke(channel.receiveAsFlow().onCompletion { activeClientStreams.remove(id) })
}

Conclusion

This project has baking been in the back of my mind for a long time. I was relieved when I got it all coded up and working. Makes writing client-server code really easy.

The next thing is to cluster up the servers running this so that they can communicate with each other pub/sub or request-response style. This would most likely be done via vertx’s clustered eventbus. The two combined should have really solid synergy