6

我正在寻找一种方法来强制 JGroups 使用特定服务器作为协调器,如果该服务器不存在,则选择一个新的协调器,直到指定的那个重新加入集群并接管作为协调器。

在这种情况下,我们通过 Coordinator 将一些信息推送到集群中,监听主题以获取更新,但是获取和处理这些更新可能是资源密集型的,因此我们不希望它向外部世界提供任何服务。因此,在集群前面的负载均衡器中,我们将其设置为不发送到协调器。但是因为 Coordinator 是随机选出的,所以我们基本上需要关闭集群,直到只有单机在里面,然后再启动集群的其余部分。

4

3 回答 3

5

目前没有办法做到这一点。Jgroups 花费了大量时间来确保协调器可以是组中的任何节点。维护和监控组成员列表健康的所有任务在组中的所有成员之间共享,以确保协调员的职责不会过多地影响协调员的绩效。标准 GMS(Group MembershipService)协议栈类负责协调器的选择。目前它只是视图列表中的第一个主机。

要获得这种行为,您将必须实现自己的协议栈。有趣的是,我一直在为 Jgroups 开发一个协议栈,它大致实现了您所要求的,但是它还没有准备好迎接黄金时段。

然而,其他人可能已经在这个问题上受到了打击。我建议在jgroups 邮件列表上发帖并提出同样的问题。

于 2012-06-05T22:46:32.077 回答
2

刚刚偶然发现这个帖子。在 JGroups 中有一种简单而标准的方法可以做到这一点:[1]。它本质上是让用户代码控制视图生成。

[1] http://www.jgroups.org/manual4/index.html#MembershipChangePolicy

于 2019-01-22T11:09:34.800 回答
1

您可以将所需的节点设置为协调器。: github 示例

我正在添加同步块以在所有节点上完成更改并完成代码:

public static final String GMS_DELTA_VIEW_FIELD_NAME = "use_delta_views";

/**
 * Change coordinator to {@code desiredCoordinator}. Must be invoked from coordinator.
 * @param desiredCoordinator
 * @return {@code true} if changes success, {@code false} overwise 
 */
boolean changeCoordinator(JChannel currentChannel, Address desiredCoordinator) {

    if(!Util.isCoordinator(currentChannel.getAddress)) {
        throw new RuntimeException("The current node is not coordinator.");
    }

    ArrayList<Address> newMembersOrder = Lists.newArrayList(currentView.getMembers());        

    // Switch desired node to first place
    Collections.swap(newMembersOrder, 0, newMembersOrder.indexOf(desiredCoordinator));        

    // Create new view
    long newId = currentView.getViewId().getId() + 1;
    View newView = new View(newMembersOrder.get(0), newId, newMembersOrder);

    GMS gms = (GMS)clusterChannel.getProtocolStack().findProtocol(GMS.class);
    CustomProtocol protocol = new CustomProtocol(newMembersOrder.stream()
            .filter(item -> !item.equals(currentChannel.getAddress()))
            .collect(Collectors.toSet()));

    boolean oldUseDeltaViews = (Boolean)gms.getValue(GMS_DELTA_VIEW_FIELD_NAME);
    try {
        // Disable using_delta_views at GMS
        gms.setValue(GMS_DELTA_VIEW_FIELD_NAME, false);

        // Insert custom protocol below GMS for synchronizing with VIEW_ACK events
        currentChannel.getProtocolStack().insertProtocolInStack(protocol, gms, ProtocolStack.BELOW);
        gms.castViewChange(newView, null, newMembersOrder);

        // Wait no more than 30 seconds to all VIEW_ACK responses
        if (!protocol.collector.waitForAllAcks(TimeUnit.SECONDS.toMillis(30))) {                
            return false;
        }

        return true;
    }
    finally {
        // Repair old state
        gms.setValue(GMS_DELTA_VIEW_FIELD_NAME, oldUseDeltaViews);
        currentChannel.getProtocolStack().removeProtocol(protocol);
    }
}

private class CustomProtocol extends Protocol implements UpHandler {

    AckCollector collector;

    public CustomProtocol(Collection<Address> waitedAddresses) {
        collector = new AckCollector(waitedAddresses);
    }

    @Override
    public Object up(Event evt) {

        if(evt.getType() == Event.MSG) {
            final Message msg=(Message)evt.getArg();
            GmsHeader hdr=(GmsHeader)msg.getHeader(proto_id);
            if(hdr != null && hdr.getType() == GmsHeader.VIEW_ACK) {                    
                collector.ack(msg.getSrc());
            }
        }

        return super.up(evt);
    }
}
于 2015-11-27T16:11:28.517 回答