Reliable Distributed Communication with JGroups
We were looking for an alternative to Java Messaging Service (JMS) for a project requirement – I had had my fair share of unpleasant experiences with JMS from the past. With JMS, most of which had to do with housekeeping involved in the event of a broker outage. Issue at hand The solution we were working on was a legacy web-based application, which was being enabled for a cluster. Scalability, it seems, was not designed into the system. The solution, therefore, demanded making the system scalable with a quick-turnaround, minimal code changes, and of course, if not positive, then no negative impact on the application throughput. The main challenge was the presence of a programmatic cache (using core Java data structures), for some specific requirements, which of course, needed to be made cluster-aware. Data consistency is the primary concern in such cases, because the application pages can be serviced by any potentially any node. In this post, I want to talk about how we achieved the same – since such a scenario of different nodes needing to “talk” can arise in many a applications of the present day – as distributed computing is becoming more and more popular. Analysis When we say cluster-aware we imply that the clustered application, might need to talk to its peers at various junctures. It might or might not decide to perform an operation based on those interactions – but the conversation does need to occur. There are of course several options today to facilitate the same – JMS, Hazelcast, JGroups, etc. JMS requires the presence of an active broker process at all times. The housekeeping involved in broker outages, especially when message persistence is enabled (for reliability), becomes a problem. Solutions like Hazelcast required us to modify the existing code so as to use the data structures provided by them. This wasn't feasible because we wanted to minimize the code changes and rather work out a solution on top of the existing code base. We settled for JGroups because it seemed to address all the concerns:
there is no active process required beforehand, but rather, each node either joins an existing communication session, or else, starts one of its own. This means that there's no single point of failure (SPOF)
message reliability, ordering and flow control are all configurable
code changes could be contained, because it involved minor modifications.
JGroups is a reliable IP toolkit that is based upon a flexible protocol stack. IP , or broadcast, implies sending a message to multiple recipients. Traditionally, multicast communication is UDP (User Datagram Protocol) based, and is unreliable. For instance, think of a streaming video from YouTube – where once the streaming starts, little does it matter that a few frames are missed or jagged. Such are typically unreliable UDP based transmissions.
JGroups addresses the shortcomings of traditional IP multicast by adding aspects like reliability and membership to it.
Flexible protocol stack implies that there are options to tailor JGroups behaviour according to the application requirements. For example, aspects like Failure Detection, Encryption, etc., can be configured.
Design Conceptually, the design was very simple: the caches on each node needed to “talk” to other nodes, whenever a significant operation occurred. Since we were dealing with user-defined entities, these operations mostly involved some kind of update of these entities. Since each of the node needed a JGroups handle in order to be able to talk, we invoked a local JGroups session on each. Through the concept of groups and membership, a JGroups session looks for other active sessions when it comes alive. Consequently, members belonging to the same group are aware of all the peers. For example: Suppose node n1 comes up as the very first node, and initializes a JGroups session, creating a group called ProjectXGroup. Any subsequent node, say, n2, n3, also does the same, but in its case, it find an active ProjectXGroup already in place, and hence joins it, rather than creating a new session. Thus, each node (peer) is aware of all the members in that form the group. JGroups, by design, provides failure detection (FD) – that any node that goes down is removed from the list of active members, thus ensuring that at any given point of time, all the group members are aware of the current state of the group. Any local update of one or more entities required the change to be propagated to all the peers. We identified that this should be a 3-step process. Given an entity EntityX, the following needed to be done on all the nodes:
Lock EntityX
Update ExtityX
Unlock EntityX
We also needed to ensure that failure to perform any of these operations should result in a rollback, which also needed to be taken care of. For any given operation, we subdivided it into smaller tasks. An example of a task could be LockTask, UpdateTask, UnlockTask, etc. It is these tasks that would be the language of communication. Typically a task would have taskID, information about the entities involved, status flag (SCHEDULED, SUCCESSFUL, FAILED..), etc.
JGroups has options to either broadcast a message or unicast it. Since our implementation required that caches on all the peers be consistent, a sender broadcasts a task to all the peers. Each peer, would then perform the task, and report the outcome back to the sender. Continuing with our earlier example, given a task T1 which the involves the following sub-tasks on an entity known as E1:
Lock E1
Update E1
Unlock E1
Let's consider the first sub-task -- Lock E1: This would require E1 to be locked on all the nodes, and therefore we invoke the LockTask, with information about E1. Now, suppose n2 is a node which is catering to the user at a given point of time, then n2 would first lock E1 locally, and then broadcast this LockTask to the group. Since it is a broadcast, all the nodes (including n2) would receive and process the LockTask (and lock E1 locally). We can easily identify the sender and not perform the task there (since the task was issued to other nodes only after the sender had performed it). This means that n2 would ignore the task, but n1, n3..nn would need to perform it.
Once the given task is processed, in order to report the outcome back, we set the appropriate status flags in the original Task object and send it back to the sender. This time, however, we set it as a unicast communication (by specifying the recipient), since we know who is this message directed to. What it boils down to in terms of JGroups is: after performing the task n1, n3,...nn would all get the handle to the JGroups session and can use the same LockTask instance to report back the outcome of a given task. Upon receiving responses from all the peers, and if the responses are positive, the sender can then proceed with other operations (sub-tasks) -- Update E1, Unlock E1...etc. Exceptional Scenarios There are various junctures where the above conversation could fail, and thus, all the exceptional scenarios need to be taken care of. There were several failure scenarios that we handled. To list a few:
No response (or response timeout) from one or more peers
Failure of subtask in the sender node
Failure of subtask in any peer, etc.
Summary We discussed how JGroups can be an apt solution in situations where reliable distributed communication is necessary. We also saw an example where JGroups was used for enabling intra-node communication for broadcast and unicast services.