11

我正在使用 Apache Curator 库对 Zookeeper 进行领导选举。我的应用程序代码部署在多台机器上,我只需要从一台机器上执行我的代码,这就是为什么我在 zookeeper 上进行领导选举,以便我可以检查我是否是领导者,然后执行此代码。

下面是我的LeaderElectionExecutor课程,它确保每个应用程序都有一个 Curator 实例

public class LeaderElectionExecutor {

    private ZookeeperClient zookClient;

    private static final String LEADER_NODE = "/testleader";

    private static class Holder {
        static final LeaderElectionExecutor INSTANCE = new LeaderElectionExecutor();
    }

    public static LeaderElectionExecutor getInstance() {
        return Holder.INSTANCE;
    }

    private LeaderElectionExecutor() {
        try {
            String hostname = Utils.getHostName();

            String nodes = "host1:2181,host2:2181;

            zookClient = new ZookeeperClient(nodes, LEADER_NODE, hostname);
            zookClient.start();

            // added sleep specifically for the leader to get selected
            // since I cannot call isLeader method immediately after starting the latch
            TimeUnit.MINUTES.sleep(1);
        } catch (Exception ex) {
            // logging error
            System.exit(1);
        }
    }

    public ZookeeperClient getZookClient() {
        return zookClient;
    }
}

下面是我的ZookeeperClient代码 -

// can this class be improved in any ways?
public class ZookeeperClient {

    private CuratorFramework client;
    private String latchPath;
    private String id;
    private LeaderLatch leaderLatch;

    public ZookeeperClient(String connString, String latchPath, String id) {
        client = CuratorFrameworkFactory.newClient(connString, new ExponentialBackoffRetry(1000, Integer.MAX_VALUE));
        this.id = id;
        this.latchPath = latchPath;
    }

    public void start() throws Exception {
        client.start();
        leaderLatch = new LeaderLatch(client, latchPath, id);
        leaderLatch.start();
    }

    public boolean isLeader() {
        return leaderLatch.hasLeadership();
    }

    public Participant currentLeader() throws Exception {
        return leaderLatch.getLeader();
    }

    public void close() throws IOException {
        leaderLatch.close();
        client.close();
    }

    public CuratorFramework getClient() {
        return client;
    }

    public String getLatchPath() {
        return latchPath;
    }

    public String getId() {
        return id;
    }

    public LeaderLatch getLeaderLatch() {
        return leaderLatch;
    }
}

现在在我的应用程序中,我正在使用这样的代码 -

public void method01() {
    ZookeeperClient zookClient = LeaderElectionExecutor.getInstance().getZookClient();
    if (zookClient.isLeader()) {
        // do something
    }
}

public void method02() {
    ZookeeperClient zookClient = LeaderElectionExecutor.getInstance().getZookClient();
    if (zookClient.isLeader()) {
        // do something
    }
}

问题陈述:-

在 Curator 库中 -isLeader()启动闩锁后立即调用将不起作用。领导者被选中需要时间。仅因为这个原因,我在我的LeaderElectionExecutor代码中添加了 1 分钟的睡眠时间,它工作正常,但我想这不是正确的方法。

有没有更好的方法来做到这一点?记住这一点,我需要一种方法来检查我是否是领导者,然后执行这段代码。我不能用一个方法做所有事情,所以我需要isLeader从不同的类和方法中调用方法来检查我是否是领导者,然后只执行这段代码。

我正在使用 Zookeeper 3.4.5 和 Curator 1.7.1 版本。

4

4 回答 4

1

一旦我解决了一个与您的问题非常相似的问题。我就是这样做的。

首先,我让我的对象由 Spring 管理。所以,我有一个可以通过容器注入的 LeaderLatch。使用 LeaderLatch 的组件之一是一个LeadershipWatcherRunnable 接口的实现,它将领导事件分派给其他组件。最后这些组件是我命名的接口的实现LeadershipObserver。的实现LeadershipWatcher主要类似于以下代码:

@Component
public class LeadershipWatcher implements Runnable {
  private final LeaderLatch leaderLatch;
  private final Collection<LeadershipObserver> leadershipObservers;

  /* constructor with @Inject */

  @Override
  public void run() {
    try {
      leaderLatch.await();

      for (LeadershipObserver observer : leadershipObservers) {
        observer.granted();
      }
    } catch (InterruptedException e) {
      for (LeadershipObserver observer : leadershipObservers) {
        observer.interrupted();
      }
    }
  }
}

由于这只是一个草图,我建议您增强此代码,也许应用命令模式来调用观察者,或者甚至将观察者提交到线程池,如果他们的工作是阻塞或长时间运行的 CPU 密集型任务。

于 2015-02-13T16:34:23.620 回答
0

我以前没有与动物园管理员或策展人合作过,所以我的回答持保留态度。

设置一个标志。

布尔 isLeaderSelected = false;

在 Latch 开始时,将标志设置为 false。选择领导者后,将标志设置为 true。

在 isLeader() 函数中:

isLeader(){
while(!isLeaderSelected){} //waits until leader is selected

//do the rest of the function
}

这也是一个比较老套的解决方法,但它应该允许 isLeader 方法尽快执行。如果它们在不同的类中,getter 应该能够提供 isLeaderSelected。

于 2015-02-08T09:53:03.403 回答
0

这是为了恢复一个老问题......

这类似于 srav 给出的答案,但我会警告不要使用该代码,因为它利用忙等待并且可能导致线程内发出的某些回调永远不会被调用,可能永远阻塞。此外,如果存在真正的问题,它可以永远重试。

这是我的解决方案,它利用 CuratorClient 的重试策略在必要时尝试等待领导选举。

    RetryPolicy retryPolicy = _client.getZookeeperClient().getRetryPolicy();
    RetrySleeper awaitLeadership = _leaderLatch::await;

    final long start = System.currentTimeMillis();
    int count = 0;

    do {
        try {
            // curator will return a dummy leader in the case when a leader has
            // not yet actually been elected. This dummy leader will have isLeader
            // set to false, so we need to check that we got a true leader
            if (_leaderLatch.getLeader().isLeader()) {
                return;
            }
        } catch (KeeperException.NoNodeException e) {
            // this is the case when the leader node has not yet been created
            // by any client - this is fine because we are still waiting for
            // the algorithm to start up so we ignore the error
        }
    } while (retryPolicy.allowRetry(count++, System.currentTimeMillis() - start, awaitLeadership));

    // we have exhausted the retry policy and still have not elected a leader
    throw new IOException("No leader was elected within the specified retry policy!");

虽然查看您的 CuratorFramework 初始化,但Integer.MAX_VALUE在指定重试策略时我会警告不要使用...

我希望这有帮助!

于 2017-05-10T04:29:11.577 回答
0
leaderLatch = new LeaderLatch(curatorClient, zkPath, String.valueOf(new Random().nextInt()));
leaderLatch.start();
Participant participant;
while(true) {
  participant = leaderLatch.getLeader();
  // Leader election happens asynchronously after calling start, this is a hack to wait until election happens
  if (!(participant.getId().isEmpty() || participant.getId().equalsIgnoreCase(""))) {
    break;
  }
}
if(leaderLatch.hasLeadership()) {
...
}

请注意,getLeader 返回一个 id 为 "" 的虚拟参与者,直到它选出一个领导者。

于 2015-08-31T22:25:28.220 回答