我有一个工人类,它将 znode 注册为 /workers/worker-1,我想从 /assign/worker-1/task1 接收 master 为这个工人分配的任务。我已经注册了一个主监听器,并且还在分配路径 (/assign/worker-1) 上启动了一个 PathChildrenCache,但由于某种原因,我没有收到有关工作人员的任何事件。这是代码的样子
public class Worker {
@Autowired
public Worker(CuratorFramework curatorFramework) {
client = curatorFramework;
//initialize assignments cache
assignments = new PathChildrenCache(client, Constants.ZK_ASSIGNMENTS_PATH + "/" + instanceId, false);
}
/**
* Starts the worker
* @throws Exception
*/
public void start() throws Exception{
log.debug("starting worker " + instanceId);
//create worker nodes
client.create()
.withMode(CreateMode.EPHEMERAL)
.inBackground()
.forPath(Constants.ZK_WORKERS_PATH + "/" + instanceId, new byte[0]);
//create worker assignment nodes
client.create()
.withMode(CreateMode.PERSISTENT)
.inBackground()
.forPath(Constants.ZK_ASSIGNMENTS_PATH + "/" + instanceId, new byte[0]);
//register listerner for assignments cache
assignments.getListenable().addListener(assignmentsListener);
assignments.start();
//main worker listener
client.getCuratorListenable().addListener(new CuratorListener(){
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
log.debug("worker event => " + event);
}
});
}
/**
* Assignments cache listener
*/
PathChildrenCacheListener assignmentsListener = new PathChildrenCacheListener(){
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch(event.getType()){
case CHILD_ADDED:
log.debug("worker acknowleding assignment of " + event.getData().getPath());
break;
default:
log.debug("worker received event " + event);
}
}
};
}
因此,在此之后,如果我在 /assign/worker-1 下创建一个节点,我将不会收到有关工作人员的任何通知。任何想法我可能做错了什么。
谢谢