Building a Vertx Cluster Manager

Introduction

It is common practice to deploy multiple copies of an application spread across multiple servers, sometimes even across multiple regions. This is mainly to take advantage of high availbility - if one copy goes down [due to an error or during a code deploy] there are others available servicing requests - as well as to spread out the traffic load across multiple machines.

What happens when you start getting multiple copies of multiple applications across multiple servers? The communication overhead increases significantly. Teams now need to figure out how inter-service communication should be structured. Will it be http requests? Well how will service discovery records be propogated across everything? What happens when servers go down?

The fundamental issue here is that the services are to work cooperatively, but their deployment and network topology have them segregated. I have seen it countless times that a deployment has 3 servers with a Load balancer in front - but the 3 servers cannot talk to their own peers. One pattern is to have all of them connect to a redis instance and do pub-sub communication but now you need a redis server and also there is no way to do a request-response paradigm. A good approach is one that opens multiple doors and solves multiple problems.

Once I was working on a block service where we had the ability for ‘global’ blocks - these were only a few so we wanted to keep them in memory as well but if a delete block request came to a particular server the others would not know about it and would still have stale data - having the ability to talk to my own service’s peers would have come handy to publish the delete message to other nodes as well

What we need is the ability to cluster the individual servers as an integrated service mesh [not istio/linkerd] so that they are all part of one cohesive unit yet handling their own specific part of the application.

This would allow my service to not only easily talk to my own peers as well as communicate with other services that are servicing my larger application platform.

Vertx eventbus

Vertx provides an in built event bus that can be used for request-response and pub-sub style communication. It is quite simple in a single node scenario - you register a consumer lambda and then send messages to a topic which then invokes the lambda registered previously

1
2
3
4
5
6
// register consumer somewhere
vertx.eventBus().consumer<String>("svc.one").handler {
it.reply("hello from svc.one")
}
// make request from elsewhere
vertx.eventBus().request<String>("svc.one", "hello")

In a single node scenario, I tend to advise against using the event bus since you should use regular function calls - as all the code is within the same process.

However, what about the situation where your service is deployed to multiple nodes - and there are other nodes with other services that you may want to interact with? In those situations using the clustered eventbus can make inter-service communication quite easy

Vertx cluster manager

To use the clustered event bus, which is an eventbus that spans across multiple machines, you must use Vertx in clustered mode using a ClusterManager. The cluster manager is what coordinates the eventbus topic registrations across different servers, which is then subsequently used by Vertx itself to figure out which server should the message go to. If the topic exists on the local node it is sent to that otherwise the message is sent to the remote server over tcp.

By default, there are a few implementations of the cluster manager - ignite, hazelcast, and zookeeper - the problem is all of them are bloated, slow, and suck.

Thankfully the ClusterManager is a regular interface that we can implement to create one that works for our requirements. This post will walk through how you can create your own cluster manager and plug it into Vertx to get clustered eventbus functionality easily.

I will run ‘multiple servers’ as different coroutines within the same process - so it is slightly cheating but they will emulate the actual topology just fine. The manager can scale out to different machines all the same. I will make sure to point out what information is local to the node and what information needs to be global and accessible to all servers.

Implementing the cluster manager interface

To start with we need to create a class that implements the ClusterManager interface

1
2
class SuperAwesomeClusterManager : ClusterManager {
}

Now we need to implement the functions required. Not all functions are required for the eventbus functionality so we will skip those in this post. These include nodeListener, getSyncMap, getAsyncMap getCounter, getLock - we will leave them blank

The first function that is called is the init function, which allows us to hold on to the running vertx instance. No real functionality is required in this method other than holding a reference to Vertx itself [that too if required]

1
2
3
4
5
6
7
8
9
class SuperAwesomeClusterManager : ClusterManager {
private lateinit var vertx: Vertx
private lateinit var nodeSelector: NodeSelector

override fun init(vertx: Vertx, nodeSelector: NodeSelector) {
this.vertx = vertx
this.nodeSelector = nodeSelector
}
}

The next call will be to the join method, which tells us that this node is joining the cluster and that we should perform any required housekeeping work now. For us this will simply be a local boolean flag that we will flip to true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SuperAwesomeClusterManager : ClusterManager {
private lateinit var vertx: Vertx
private lateinit var nodeSelector: NodeSelector
private var partOfCluster = false

override fun init(vertx: Vertx, nodeSelector: NodeSelector) {
this.vertx = vertx
this.nodeSelector = nodeSelector
}

override fun join(promise: Promise<Void>) {
this.partOfCluster = true
promise.tryComplete()
}
}

Conversely, there is a leave method that will be called when the clustermanager/vertx is closed. This would indicate that this server is leaving the cluster and we should perform any cleanup work required. In this case we will simply flip the partOfCluster boolean back to false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class SuperAwesomeClusterManager : ClusterManager {
private lateinit var vertx: Vertx
private lateinit var nodeSelector: NodeSelector
private var partOfCluster = false

override fun init(vertx: Vertx, nodeSelector: NodeSelector) {
this.vertx = vertx
this.nodeSelector = nodeSelector
}

override fun join(promise: Promise<Void>) {
this.partOfCluster = true
promise.tryComplete()
}

override fun leave(promise: Promise<Void>) {
this.partOfCluster = false
promise.tryComplete()
}
}

The interface also requires a method that returns whether this node is active or not. In our case it is functionally equivalent to a getter on the partOfCluster boolean. All in all the methods around joining and leaving look as follows

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class SuperAwesomeClusterManager : ClusterManager {
private lateinit var vertx: Vertx
private lateinit var nodeSelector: NodeSelector
private var partOfCluster = false

override fun init(vertx: Vertx, nodeSelector: NodeSelector) {
this.vertx = vertx
this.nodeSelector = nodeSelector
}

override fun join(promise: Promise<Void>) {
this.partOfCluster = true
promise.tryComplete()
}

override fun leave(promise: Promise<Void>) {
this.partOfCluster = false
promise.tryComplete()
}

override fun isActive(): Boolean {
return this.partOfCluster
}
}

There is one function required called clusterHost() this needs to return the IP of the host that should be used for communication purposes. I am sure there are many ways to get self ip but the easiest way I know is to use Vert’x UDP socket functionality and create a ‘request’ to Google’s DNS server and grab the local address of the datagram socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class SuperAwesomeClusterManager : ClusterManager {
private lateinit var vertx: Vertx
private lateinit var nodeSelector: NodeSelector
private var partOfCluster = false

override fun init(vertx: Vertx, nodeSelector: NodeSelector) {
this.vertx = vertx
this.nodeSelector = nodeSelector
}

override fun join(promise: Promise<Void>) {
this.partOfCluster = true
promise.tryComplete()
}

override fun leave(promise: Promise<Void>) {
this.partOfCluster = false
promise.tryComplete()
}

override fun isActive(): Boolean {
return this.partOfCluster
}

override fun clusterHost(): String {
DatagramSocket().use {
it.connect(InetAddress.getByName("8.8.8.8"), 12345)
return it.localAddress.hostAddress
}
}
}

The next set of methods required to implement revolve around the node itself - creating a unique nodeID, setting/getting node infos and getting a list of nodes in the cluster

The first step is to assign a unique node id to the server itself and return it in the getNodeId method

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class SuperAwesomeClusterManager : ClusterManager {
private lateinit var vertx: Vertx
private lateinit var nodeSelector: NodeSelector
private var partOfCluster = false
private var myNodeId = UUID.randomUUID().toString()

override fun init(vertx: Vertx, nodeSelector: NodeSelector) {
this.vertx = vertx
this.nodeSelector = nodeSelector
}

override fun join(promise: Promise<Void>) {
this.partOfCluster = true
promise.tryComplete()
}

override fun leave(promise: Promise<Void>) {
this.partOfCluster = false
promise.tryComplete()
}

override fun isActive(): Boolean {
return this.partOfCluster
}

override fun clusterHost(): String {
DatagramSocket().use {
it.connect(InetAddress.getByName("8.8.8.8"), 12345)
return it.localAddress.hostAddress
}
}

override fun getNodeId(): String {
return this.myNodeId
}
}

The next three methods setNodeInfo, getNodes, and getNodeInfo all need access to state that should be global to all the nodes in the cluster. For now it will just be an in-memory hashmap but for a ClusterManager that needs to span across multiple nodes an in memory hashmap will not work. Uptil now this is the first piece of information that needs to be kept across nodes/servers.

setNodeInfo needs to store node info with the nodeId as the key so that it can be looked up later based on the node Id
getNodeInfo returns the nodeInfo based on the nodeId provided
and getNodes returns a list of all the nodeIds that are currently part of the cluster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// some global store that is accessible by all nodes in your cluster
val nodeMap = mutableMapOf<String, NodeInfo>()

class SuperAwesomeClusterManager : ClusterManager {
// information local to the node
private lateinit var vertx: Vertx
private lateinit var nodeSelector: NodeSelector
private var partOfCluster = false
private var myNodeId = UUID.randomUUID().toString()

override fun init(vertx: Vertx, nodeSelector: NodeSelector) {
this.vertx = vertx
this.nodeSelector = nodeSelector
}

override fun join(promise: Promise<Void>) {
this.partOfCluster = true
promise.tryComplete()
}

override fun leave(promise: Promise<Void>) {
this.partOfCluster = false
promise.tryComplete()
}

override fun isActive(): Boolean {
return this.partOfCluster
}

override fun clusterHost(): String {
DatagramSocket().use {
it.connect(InetAddress.getByName("8.8.8.8"), 12345)
return it.localAddress.hostAddress
}
}

override fun getNodeId(): String {
return this.myNodeId
}

override fun setNodeInfo(nodeInfo: NodeInfo, promise: Promise<Void>) {
nodeMap[this.myNodeId] = nodeInfo
promise.tryComplete()
}

override fun getNodeInfo(): NodeInfo? {
return nodeMap[this.myNodeId]
}

override fun getNodes(): List<String> {
return nodeMap.map { it.key }.toList()
}
}

All of this is great but where is all the eventbus information saved? Patience grasshopper.

Now that we have all the node information accessible across the servers we can start keeping track of where each eventbus address is registered.

The three functions we need to implement are

addRegistration this connects an eventbus address to a RegistrationInfo Object
removeRegistration this removes the RegistrationInfo if assigned to an eventbus address
and getRegistrations which returns a list of RegistrationInfo for a given event bus address

You might notice from the signature that a single address can have multiple registration infos - because a single topic can be registered on multiple machines

The address to registration info map is the second piece of information that needs to be globally accessible across all the nodes in the cluster. The full cluster manager [with the non-eventbus methods ommitted] will look something like follows

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// some global store that is accessible by all nodes in your cluster
val nodeMap = mutableMapOf<String, NodeInfo>()
val regInfo = mutableMapOf<String, MutableList<RegistrationInfo>>()

class SuperAwesomeClusterManager : ClusterManager {
// information local to the node
private lateinit var vertx: Vertx
private lateinit var nodeSelector: NodeSelector
private var partOfCluster = false
private var myNodeId = UUID.randomUUID().toString()

override fun init(vertx: Vertx, nodeSelector: NodeSelector) {
this.vertx = vertx
this.nodeSelector = nodeSelector
}

override fun join(promise: Promise<Void>) {
this.partOfCluster = true
promise.tryComplete()
}

override fun leave(promise: Promise<Void>) {
this.partOfCluster = false
promise.tryComplete()
}

override fun isActive(): Boolean {
return this.partOfCluster
}

override fun clusterHost(): String {
DatagramSocket().use {
it.connect(InetAddress.getByName("8.8.8.8"), 12345)
return it.localAddress.hostAddress
}
}

override fun getNodeId(): String {
return this.myNodeId
}

override fun setNodeInfo(nodeInfo: NodeInfo, promise: Promise<Void>) {
nodeMap[this.myNodeId] = nodeInfo
promise.tryComplete()
}

override fun getNodeInfo(): NodeInfo? {
return nodeMap[this.myNodeId]
}

override fun getNodes(): List<String> {
return nodeMap.map { it.key }.toList()
}

override fun addRegistration(address: String, registrationInfo: RegistrationInfo, promise: Promise<Void>) {
regInfo.computeIfAbsent(address) { mutableListOf() }.add(registrationInfo)
promise.tryComplete()
}

override fun removeRegistration(address: String, registrationInfo: RegistrationInfo, promise: Promise<Void>) {
regInfo.getOrDefault(address, mutableListOf()).remove(registrationInfo)
promise.tryComplete()
}

override fun getRegistrations(address: String, promise: Promise<MutableList<RegistrationInfo>>) {
promise.tryComplete(regInfo[address] ?: mutableListOf())
}
}

Using the cluster manager

To use the cluster manager we need to start Vertx in clustered mode by passing in an instance of the cluster manager into vertx options

Here we simulate a process that registers a handler that responds on messages sent to the topic svc.one

1
2
3
4
5
6
7
8
suspend fun processOne() {
println("starting process one")
val vertxOptions = VertxOptions().setClusterManager(SuperAwesomeClusterManager())
val vertx = Vertx.clusteredVertx(vertxOptions).await()
vertx.eventBus().consumer<String>("svc.one").handler {
it.reply("hello from svc.one process one")
}
}

Process Two will be the requester and there will be another Process three that also has the topic registered

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
suspend fun processTwo() {
println("starting process two")
val vertxOptions = VertxOptions().setClusterManager(SuperAwesomeClusterManager())
val vertx = Vertx.clusteredVertx(vertxOptions).await()
delay(3000)
// send a message to svc.one 3 times
repeat(3) {
println(vertx.eventBus().request<String>("svc.one", "hello").await().body())
}
}

suspend fun processThree() {
println("starting process three")
val vertxOptions = VertxOptions().setClusterManager(SuperAwesomeClusterManager())
val vertx = Vertx.clusteredVertx(vertxOptions).await()
vertx.eventBus().consumer<String>("svc.one").handler {
it.reply("hello from svc.one process three")
}
}

The main function orchestrates the three ‘processes’ as coroutines

1
2
3
4
5
6
7
8
suspend fun main() {
val scope = CoroutineScope(Dispatchers.IO)
scope.launch { processOne() }
delay(1_000)
scope.launch { processTwo() }
delay(1_000)
scope.launch { processThree() }
}

The final output of the above code is

1
2
3
4
5
6
starting process one
starting process two
starting process three
hello from svc.one process one
hello from svc.one process three
hello from svc.one process one

You can see that the requests are automatically round-robbinned across servers that have the address registered giving us load balancing across nodes for free as well.

Conclusion

The clustered eventbus is one of the best features of Vertx. It greatly simplifies communication between services. It would get more use if there were simpler cluster managers available that would work better with today’s software deployment strategies.