Building a Vertx Cluster Manager
Introduction
It is common practice to deploy multiple copies of an application spread across multiple servers, sometimes even across multiple regions. This is mainly to take advantage of high availbility - if one copy goes down [due to an error or during a code deploy] there are others available servicing requests - as well as to spread out the traffic load across multiple machines.
What happens when you start getting multiple copies of multiple applications across multiple servers? The communication overhead increases significantly. Teams now need to figure out how inter-service communication should be structured. Will it be http requests? Well how will service discovery records be propogated across everything? What happens when servers go down?
The fundamental issue here is that the services are to work cooperatively, but their deployment and network topology have them segregated. I have seen it countless times that a deployment has 3 servers with a Load balancer in front - but the 3 servers cannot talk to their own peers. One pattern is to have all of them connect to a redis instance and do pub-sub communication but now you need a redis server and also there is no way to do a request-response paradigm. A good approach is one that opens multiple doors and solves multiple problems.
Once I was working on a block service where we had the ability for ‘global’ blocks - these were only a few so we wanted to keep them in memory as well but if a delete block request came to a particular server the others would not know about it and would still have stale data - having the ability to talk to my own service’s peers would have come handy to publish the delete message to other nodes as well
What we need is the ability to cluster the individual servers as an integrated service mesh [not istio/linkerd] so that they are all part of one cohesive unit yet handling their own specific part of the application.
This would allow my service to not only easily talk to my own peers as well as communicate with other services that are servicing my larger application platform.
Vertx eventbus
Vertx provides an in built event bus that can be used for request-response and pub-sub style communication. It is quite simple in a single node scenario - you register a consumer lambda and then send messages to a topic which then invokes the lambda registered previously
1 | // register consumer somewhere |
In a single node scenario, I tend to advise against using the event bus since you should use regular function calls - as all the code is within the same process.
However, what about the situation where your service is deployed to multiple nodes - and there are other nodes with other services that you may want to interact with? In those situations using the clustered eventbus can make inter-service communication quite easy
Vertx cluster manager
To use the clustered event bus, which is an eventbus that spans across multiple machines, you must use Vertx in clustered mode using a ClusterManager
. The cluster manager is what coordinates the eventbus topic registrations across different servers, which is then subsequently used by Vertx itself to figure out which server should the message go to. If the topic exists on the local node it is sent to that otherwise the message is sent to the remote server over tcp.
By default, there are a few implementations of the cluster manager - ignite, hazelcast, and zookeeper - the problem is all of them are bloated, slow, and suck.
Thankfully the ClusterManager is a regular interface that we can implement to create one that works for our requirements. This post will walk through how you can create your own cluster manager and plug it into Vertx to get clustered eventbus functionality easily.
I will run ‘multiple servers’ as different coroutines within the same process - so it is slightly cheating but they will emulate the actual topology just fine. The manager can scale out to different machines all the same. I will make sure to point out what information is local to the node and what information needs to be global and accessible to all servers.
Implementing the cluster manager interface
To start with we need to create a class that implements the ClusterManager interface
1 | class SuperAwesomeClusterManager : ClusterManager { |
Now we need to implement the functions required. Not all functions are required for the eventbus functionality so we will skip those in this post. These include nodeListener, getSyncMap, getAsyncMap getCounter, getLock - we will leave them blank
The first function that is called is the init
function, which allows us to hold on to the running vertx instance. No real functionality is required in this method other than holding a reference to Vertx itself [that too if required]
1 | class SuperAwesomeClusterManager : ClusterManager { |
The next call will be to the join
method, which tells us that this node is joining the cluster and that we should perform any required housekeeping work now. For us this will simply be a local boolean flag that we will flip to true
1 | class SuperAwesomeClusterManager : ClusterManager { |
Conversely, there is a leave method that will be called when the clustermanager/vertx is closed. This would indicate that this server is leaving the cluster and we should perform any cleanup work required. In this case we will simply flip the partOfCluster
boolean back to false
1 | class SuperAwesomeClusterManager : ClusterManager { |
The interface also requires a method that returns whether this node is active or not. In our case it is functionally equivalent to a getter on the partOfCluster
boolean. All in all the methods around joining and leaving look as follows
1 | class SuperAwesomeClusterManager : ClusterManager { |
There is one function required called clusterHost()
this needs to return the IP of the host that should be used for communication purposes. I am sure there are many ways to get self ip but the easiest way I know is to use Vert’x UDP socket functionality and create a ‘request’ to Google’s DNS server and grab the local address of the datagram socket
1 | class SuperAwesomeClusterManager : ClusterManager { |
The next set of methods required to implement revolve around the node itself - creating a unique nodeID, setting/getting node infos and getting a list of nodes in the cluster
The first step is to assign a unique node id to the server itself and return it in the getNodeId
method
1 | class SuperAwesomeClusterManager : ClusterManager { |
The next three methods setNodeInfo
, getNodes
, and getNodeInfo
all need access to state that should be global to all the nodes in the cluster. For now it will just be an in-memory hashmap but for a ClusterManager that needs to span across multiple nodes an in memory hashmap will not work. Uptil now this is the first piece of information that needs to be kept across nodes/servers.
setNodeInfo
needs to store node info with the nodeId as the key so that it can be looked up later based on the node IdgetNodeInfo
returns the nodeInfo based on the nodeId provided
and getNodes
returns a list of all the nodeIds that are currently part of the cluster
1 | // some global store that is accessible by all nodes in your cluster |
All of this is great but where is all the eventbus information saved? Patience grasshopper.
Now that we have all the node information accessible across the servers we can start keeping track of where each eventbus address is registered.
The three functions we need to implement are
addRegistration
this connects an eventbus address to a RegistrationInfo ObjectremoveRegistration
this removes the RegistrationInfo if assigned to an eventbus address
and getRegistrations
which returns a list of RegistrationInfo for a given event bus address
You might notice from the signature that a single address can have multiple registration infos - because a single topic can be registered on multiple machines
The address to registration info map is the second piece of information that needs to be globally accessible across all the nodes in the cluster. The full cluster manager [with the non-eventbus methods ommitted] will look something like follows
1 | // some global store that is accessible by all nodes in your cluster |
Using the cluster manager
To use the cluster manager we need to start Vertx in clustered mode by passing in an instance of the cluster manager into vertx options
Here we simulate a process that registers a handler that responds on messages sent to the topic svc.one
1 | suspend fun processOne() { |
Process Two will be the requester and there will be another Process three that also has the topic registered
1 | suspend fun processTwo() { |
The main function orchestrates the three ‘processes’ as coroutines
1 | suspend fun main() { |
The final output of the above code is
1 | starting process one |
You can see that the requests are automatically round-robbinned across servers that have the address registered giving us load balancing across nodes for free as well.
Conclusion
The clustered eventbus is one of the best features of Vertx. It greatly simplifies communication between services. It would get more use if there were simpler cluster managers available that would work better with today’s software deployment strategies.