我正在尝试创建一个持久的 ZNode 并存储我已处理的特定文件的行数。创建就像它应该的那样工作,从节点读取数据也是如此,但是如果它在相同的代码中,则删除不起作用。我会解释我的意思。
我创建了函数:
setOrCreateFileCheckpoint(fileName: String, lineNumber: Int) :- 检查 ZNode 是否存在,如果不存在则创建它并将存储的值设置为lineNumber getFileCheckpoint(fileName: String) :- 返回存储在 ZNode 中的值 deleteFileCheckpoint(fileName : String) :- 删除 ZNode
以下是所有三个的代码:
/*
updates or creates a checkpoint for a file being processed
*/
def setOrCreateFileCheckpoint(fileName: String, lineNumber: Int): Unit =
{
val fileCheckpointPath = checkpointPoolPath + "/" +fileName
val zk = getZookeeper
val zkCuratorClient = getZookeeperCuratorClient
if ( zk.exists(fileCheckpointPath, false) == null)
{
val node = new PersistentNode(zkCuratorClient, CreateMode.PERSISTENT, false, fileCheckpointPath, lineNumber.toString.getBytes())
node.start()
}
else
zk.setData(fileCheckpointPath, lineNumber.toString.getBytes(), -1)
}
/*
gets checkpoint for a file
*/
def getFileCheckpoint(fileName: String): Int =
{
val fileCheckpointPath = checkpointPoolPath + "/" +fileName
val zk = getZookeeper
val zkCuratorClient = getZookeeperCuratorClient
if ( zk.exists(fileCheckpointPath, false) != null)
new String(zk.getData(fileCheckpointPath, false, null)).toInt
else
0
}
/*
deletes the file checkpoint so that we don't keep accumulating zNodes on the zookeeper
*/
def deleteFileCheckpoint(fileName: String): Unit =
{
val fileCheckpointPath = checkpointPoolPath + "/" +fileName
val zk = getZookeeper
if ( zk.exists(fileCheckpointPath, false) == null)
{
throw RuntimeException("Trying to delete checkpoint that doesn't exist for file: " + fileName)
}
else
{
/*println(zk.exists(fileCheckpointPath, false).getVersion)
zk.delete(fileCheckpointPath, zk.exists(fileCheckpointPath, false).getVersion)*/
deleteChildren(zk, fileCheckpointPath, true)
}
}
以下是我正在测试和困惑的代码:
ZookeeperUtility.setOrCreateFileCheckpoint("file1", 2000) //let's call it cre1
println(ZookeeperUtility.getFileCheckpoint("file1")) //let's call it get1
ZookeeperUtility.deleteFileCheckpoint("file1") //let's call it del1
println("del1")
ZookeeperUtility.deleteFileCheckpoint("file1") //let's call in del2
println("del2")
运行 1:
Step1:我运行上面显示的代码
结果:在 del2 上遇到错误
Step2:将cre1注释掉,再次运行代码
结果:获取节点,给出正确的值作为在 del2 上遇到的结果错误。这令人难以置信。我不明白为什么。该节点应该被删除。
Step3:cre1仍然被注释,与上一步相同,再次运行代码
结果:节点不存在在 get1 处给出 0,这意味着节点不存在。在 del1 处遇到错误。这是第2步本身应该发生的事情
运行2:
Step1:注释掉del2,运行代码
结果:创建节点,获取正确数据,正常退出
Step2:注释掉cre1,运行代码
结果:从应该删除的节点中获取值 2000。正常退出
Step3:再次运行与step2相同的代码
结果:获取 0,在 del1 上遇到错误。
如果我一次运行代码,如果我只在一次运行中创建,只在下一次运行中获取并且只在之后的运行中删除,一切都会正常运行。我快要拔掉头发了。
PS 代码是用 Scala 编写的,但我使用的是 Java API。Scala 可以无缝地处理 Java 类。
如果您查看我已注释掉一部分的deleteFileCheckpoint函数,我也尝试过这种方法。它具有完全相同的行为。