RPC over Websockets [or TCP]
Introduction
I write a lot of server side code. A common theme when writing server side code is that there is usually a client that connects over a network and sends messages/requests to communicate with the server.
I want a network protocol that allows for a RPCs [sum(a,b)
] instead of rest style resource [/sum?a&b
] requests. I want to have full duplex communication as well as single request-response semantics. I want to have better performance by pipelining all network messages over a single connection. Http and gRPC are great but I think there is a middle ground that is underrated and that is rpc over websocket. I tend not to see a lot of apps written using rpc over websocket - probably due to the bad dev ergonomics when building the client and server layers. The very first thing that is usually missing is how do I do a request/response style workload which is the most common. Websockets are stream based - messages are sent but there is no semantic built in that provides a ‘response’ back for the message that was just sent. We have to write that part. Http has the lead here because it is extremely dev ergonomic on the request response paradigm.
The post will describe the various components I coded up that allows me to build rpc-ws based backends [and clients]. It supports all the above mentioned requirements as well as seamlessly blending the messages that are coming over the wire/network into kotlin native coroutine primitives such as Flows.
It would be really awesome if the client server can be modelled as kotlin native flows on both sides with a websocket/tcp pipe seamlessly flowing the data between them.
Since Websockets and Tcp sockets are both WriteStreams<Buffer>
in Vertx - the underlying stream network layer can be either, while keeping the rest of the semantics all the same. Benefits of having good interface design
Serialization
Unlike grpc, the apis are not strongly typed. The client and server both receive json in their handlers, which is a stripped down version of the json that came over the wire i.e. the serialization protocol is json and transport is websockets. No byte encoded messages like protobuf or cap’n’rover.
It is difficult to get strongly typed apis without writing a template engine or a compiler and I didn’t want a ‘generator’ style library. I wanted real functions with real intellisense support. The client can convert the json to a pojo easily using the toJson
and fromJson
convenience methods provided
Protocol function signatures
There are 4 types of communication I wanted to support.
- Request-Response
- Server to client stream
- Client to server stream
- Server-Client Bidirectional stream
The above patterns show how heavily inspired this library is from grpc. That is because I spent years writing grpc services professionally and saw first hand what worked and what didn’t.
The communication styles are mapped into the following kotlin function signatures. The server or the service being exposed needs to register handlers like these.
1 | // request response |
Sha service server side
An example Sha service [returns the Sha256 of the input string] exposed would look something like
The service has the actual business logic and the handlers that connect it to the rpc server using the function signatures shown above
1 | import io.vertx.core.json.JsonObject |
The service handlers are bound to a registry and a server is started as follows
1 | val shaRpcHandlers = ShaRpcHandlers() |
Whenever the client makes a ‘sha’ request the appropriate method is called i.e. the same rpc name can be shared across different communication styles.
Request response [client side]
The most important part of the project was to support request-response paradigm code flow. That is the next line of the code is only triggered when the response of the request is received. This is not a supported structure in most websocket/tcp libraries as they are stream/message oriented and there is no concept of a ‘response’ so there is no way to ‘wait’ before going to the next line.
The client has a function like fun makeRpc(rpcName: String, requestBody: JsonObject): Result<JsonObject>
to make unary rpc requests
What it would look from the client side is
1 | val rpcClient = RpcClient().connectWs() // or connectTcp() |
the code only moves onto the next line once the response is received even though the message was sent over websocket [or tcp].
The key idea behind getting the code to wait for a response is that receiving a message on an empty channel or flow is a suspending call. Suspending means that the code will ‘wait’ on that line, but doesn’t block the underlying thread, until the call completes before moving on to the next statement. It can be thought of as a syntactic sugar over a callback.
We can send a message over the socket to the server with an ID, and place a channel/flow in a hashmap with that ID, and then immediately suspend on receiving from that flow. The websocket server incoming message handler, when receives a message, it checks if there is a flow in the hashmap for that ID - if yes then it writes the incoming message to that channel which in turn un-suspends the original method call and the client gets the response in non-blocking but synchronous manner.
The client snippet looks like this
1 | suspend fun makeRpc(rpcName: String, requestBody: JsonObject): Result<JsonObject> { |
Incoming messages from the server are dispatched out to the appropriate flows based on the IDs
1 | val webSocket = vertx.createHttpClient().webSocket(port, host, path).await() |
Server stream [client side]
A request-response cycle can be thought of a server stream with only 1 message. So if we drop that then we get the code for server stream requests - where multiple messages can be received.
1 | suspend fun makeServerStreamRpc(rpcName: String, requestBody: JsonObject): Flow<JsonObject> { |
Bidi Stream [client]
A bidirectional stream is when a client and server are both asynchronously sending messages to each other. A flow is taken in by the client method that will be streamed out to the server and a flow is returned from the client method which will receive messages that the server is sending for this rpc.
1 | suspend fun makeBidiStreamRpc(rpcName: String, messages: Flow<JsonObject>): Flow<JsonObject> { |
Running handlers server side
Since service handlers are regular functions and functions are first class citizens in kotlin we can easily connect messages coming in from the client over the network to invoke appropriate handlers.
The handlers are kept in a hashmap, which is the service registry
1 | val clientStreamHandlers = HashMap<String, suspend (Flow<JsonObject>) -> JsonObject>() |
When rpc request comes in on the server then the handler is called
1 | suspend fun triggerUnaryRpc(rpcName: String, payload: JsonObject): JsonObject { |
For bidi stream messages there is a bit more book keeping work to do but nothing too complicated
1 | suspend fun handleBidiStreamMsg(header: RpcHeader, msg: JsonObject, socket: WriteStream<Buffer>) { |
Conclusion
This project has baking been in the back of my mind for a long time. I was relieved when I got it all coded up and working. Makes writing client-server code really easy.
The next thing is to cluster up the servers running this so that they can communicate with each other pub/sub or request-response style. This would most likely be done via vertx’s clustered eventbus. The two combined should have really solid synergy