您可以将所需的节点设置为协调器。:
github 示例
我正在添加同步块以在所有节点上完成更改并完成代码:
public static final String GMS_DELTA_VIEW_FIELD_NAME = "use_delta_views";
/**
* Change coordinator to {@code desiredCoordinator}. Must be invoked from coordinator.
* @param desiredCoordinator
* @return {@code true} if changes success, {@code false} overwise
*/
boolean changeCoordinator(JChannel currentChannel, Address desiredCoordinator) {
if(!Util.isCoordinator(currentChannel.getAddress)) {
throw new RuntimeException("The current node is not coordinator.");
}
ArrayList<Address> newMembersOrder = Lists.newArrayList(currentView.getMembers());
// Switch desired node to first place
Collections.swap(newMembersOrder, 0, newMembersOrder.indexOf(desiredCoordinator));
// Create new view
long newId = currentView.getViewId().getId() + 1;
View newView = new View(newMembersOrder.get(0), newId, newMembersOrder);
GMS gms = (GMS)clusterChannel.getProtocolStack().findProtocol(GMS.class);
CustomProtocol protocol = new CustomProtocol(newMembersOrder.stream()
.filter(item -> !item.equals(currentChannel.getAddress()))
.collect(Collectors.toSet()));
boolean oldUseDeltaViews = (Boolean)gms.getValue(GMS_DELTA_VIEW_FIELD_NAME);
try {
// Disable using_delta_views at GMS
gms.setValue(GMS_DELTA_VIEW_FIELD_NAME, false);
// Insert custom protocol below GMS for synchronizing with VIEW_ACK events
currentChannel.getProtocolStack().insertProtocolInStack(protocol, gms, ProtocolStack.BELOW);
gms.castViewChange(newView, null, newMembersOrder);
// Wait no more than 30 seconds to all VIEW_ACK responses
if (!protocol.collector.waitForAllAcks(TimeUnit.SECONDS.toMillis(30))) {
return false;
}
return true;
}
finally {
// Repair old state
gms.setValue(GMS_DELTA_VIEW_FIELD_NAME, oldUseDeltaViews);
currentChannel.getProtocolStack().removeProtocol(protocol);
}
}
private class CustomProtocol extends Protocol implements UpHandler {
AckCollector collector;
public CustomProtocol(Collection<Address> waitedAddresses) {
collector = new AckCollector(waitedAddresses);
}
@Override
public Object up(Event evt) {
if(evt.getType() == Event.MSG) {
final Message msg=(Message)evt.getArg();
GmsHeader hdr=(GmsHeader)msg.getHeader(proto_id);
if(hdr != null && hdr.getType() == GmsHeader.VIEW_ACK) {
collector.ack(msg.getSrc());
}
}
return super.up(evt);
}
}