2

我一直在研究一个持续监控分布式原子长计数器的过程。它使用以下类ZkClient的方法每分钟监视一次getCounter。事实上,我有多个线程运行,每个线程都在监视存储在 Zookeeper 节点中的不同计数器(分布式原子长)。每个线程通过getCounter方法的参数指定计数器的路径。

public class TagserterZookeeperManager {

public enum ZkClient {
    COUNTER("10.11.18.25:2181");  // Integration URL

    private CuratorFramework client;
    private ZkClient(String servers) {
        Properties props = TagserterConfigs.ZOOKEEPER.getProperties();
        String zkFromConfig = props.getProperty("servers", "");
        if (zkFromConfig != null && !zkFromConfig.isEmpty()) {
            servers = zkFromConfig.trim();
        }
        ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(servers, exponentialBackoffRetry);
        client.start();
    }

    public CuratorFramework getClient() {
        return client;
    }
}

public static String buildPath(String ... node) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < node.length; i++) {
        if (node[i] != null && !node[i].isEmpty()) {
            sb.append("/");
            sb.append(node[i]);
        }
    }
    return sb.toString();
}

public static DistributedAtomicLong getCounter(String taskType, int hid, String jobId, String countType) {
    String path = buildPath(taskType, hid+"", jobId, countType);
    Builder builder = PromotedToLock.builder().lockPath(path + "/lock").retryPolicy(new ExponentialBackoffRetry(10, 10));
    DistributedAtomicLong count = new DistributedAtomicLong(ZkClient.COUNTER.getClient(), path, new RetryNTimes(5, 20), builder.build());
    return count;
}

}

在线程中,这就是我调用此方法的方式:

    DistributedAtomicLong counterTotal = TagserterZookeeperManager
                        .getCounter("testTopic", hid, jobId, "test");

现在似乎在线程运行了几个小时之后,在一个阶段,我开始在尝试读取计数org.apache.zookeeper.KeeperException$ConnectionLossException的方法中收到以下异常:getCounter

org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /contentTaskProd at org.apache.zookeeper.KeeperException.create(KeeperException.java:99) at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)在 org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1045) 在 org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1073) 在 org.apache.curator.utils.ZKPaths.mkdirs(ZKPaths.java :215) 在 org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) 在 org.apache.curator.utils 的 org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148) .EnsurePath$InitialHelper.ensure(EnsurePath.java:141) 在 org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99) 在 org.apache。curator.framework.recipes.atomic.DistributedAtomicValue.getCurrentValue(DistributedAtomicValue.java:254) 在 org.apache.curator.framework.recipes.atomic.DistributedAtomicValue.get(DistributedAtomicValue.java:91) 在 org.apache.curator.framework。 recipes.atomic.DistributedAtomicLong.get(DistributedAtomicLong.java:72) ...

我一直从那里得到这个异常一段时间,我觉得它导致了一些内部内存泄漏,最终导致 OutOfMemory 错误并且整个过程退出。有谁知道这可能是什么原因?为什么 Zookeeper 会突然开始抛出连接丢失异常?进程退出后,我可以通过我编写的另一个小型控制台程序(也使用 curator)手动连接到 Zookeeper,并且一切看起来都很好。

4

1 回答 1

2

为了使用curator您可以使用NodeCache监视 Zookeeper 中的节点,这不会解决您的连接问题......但不是每分钟轮询一次节点,您可以在它更改时获得推送事件。

以我的经验,可以NodeCache很好地处理断开连接和恢复连接。

于 2016-01-11T20:30:52.110 回答