0

我正在尝试创建一个持久的 ZNode 并存储我已处理的特定文件的行数。创建就像它应该的那样工作,从节点读取数据也是如此,但是如果它在相同的代码中,则删除不起作用。我会解释我的意思。

我创建了函数:

setOrCreateFileCheckpoint(fileName: String, lineNumber: Int) :- 检查 ZNode 是否存在,如果不存在则创建它并将存储的值设置为lineNumber getFileCheckpoint(fileName: String) :- 返回存储在 ZNode 中的值 deleteFileCheckpoint(fileName : String) :- 删除 ZNode

以下是所有三个的代码:

/*
updates or creates a checkpoint for a file being processed
 */
def setOrCreateFileCheckpoint(fileName: String, lineNumber: Int): Unit =
    {
        val fileCheckpointPath = checkpointPoolPath + "/" +fileName
        val zk = getZookeeper
        val zkCuratorClient = getZookeeperCuratorClient

        if ( zk.exists(fileCheckpointPath, false) == null)
            {
                val node = new PersistentNode(zkCuratorClient, CreateMode.PERSISTENT, false, fileCheckpointPath, lineNumber.toString.getBytes())
                node.start()
            }
        else
            zk.setData(fileCheckpointPath, lineNumber.toString.getBytes(), -1)
    }

/*
gets checkpoint for a file
 */
def getFileCheckpoint(fileName: String): Int =
    {
        val fileCheckpointPath = checkpointPoolPath + "/" +fileName
        val zk = getZookeeper
        val zkCuratorClient = getZookeeperCuratorClient

        if ( zk.exists(fileCheckpointPath, false) != null)
            new String(zk.getData(fileCheckpointPath, false, null)).toInt

        else
            0

    }

/*
deletes the file checkpoint so that we don't keep accumulating zNodes on the zookeeper
 */
def deleteFileCheckpoint(fileName: String): Unit =
    {
        val fileCheckpointPath = checkpointPoolPath + "/" +fileName

        val zk = getZookeeper

        if ( zk.exists(fileCheckpointPath, false) == null)
        {
            throw RuntimeException("Trying to delete checkpoint that doesn't exist for file: " + fileName)
        }
        else
            {
                /*println(zk.exists(fileCheckpointPath, false).getVersion)
                zk.delete(fileCheckpointPath, zk.exists(fileCheckpointPath, false).getVersion)*/
                deleteChildren(zk, fileCheckpointPath, true)

            }
    }

以下是我正在测试和困惑的代码:

        ZookeeperUtility.setOrCreateFileCheckpoint("file1", 2000) //let's call it cre1

        println(ZookeeperUtility.getFileCheckpoint("file1")) //let's call it get1

        ZookeeperUtility.deleteFileCheckpoint("file1") //let's call it del1
        println("del1")

        ZookeeperUtility.deleteFileCheckpoint("file1") //let's call in del2
        println("del2")

运行 1:

Step1:我运行上面显示的代码

结果:在 del2 上遇到错误

Step2:将cre1注释掉,再次运行代码

结果:获取节点,给出正确的值作为在 del2 上遇到的结果错误。这令人难以置信。我不明白为什么。该节点应该被删除。

Step3:cre1仍然被注释,与上一步相同,再次运行代码

结果:节点不存在在 get1 处给出 0,这意味着节点不存在。在 del1 处遇到错误。这是第2步本身应该发生的事情

运行2:

Step1:注释掉del2,运行代码

结果:创建节点,获取正确数据,正常退出

Step2:注释掉cre1,运行代码

结果:从应该删除的节点中获取值 2000。正常退出

Step3:再次运行与step2相同的代码

结果:获取 0,在 del1 上遇到错误。

如果我一次运行代码,如果我只在一次运行中创建,只在下一次运行中获取并且只在之后的运行中删除,一切都会正常运行。我快要拔掉头发了。

PS 代码是用 Scala 编写的,但我使用的是 Java API。Scala 可以无缝地处理 Java 类。

如果您查看我已注释掉一部分的deleteFileCheckpoint函数,我也尝试过这种方法。它具有完全相同的行为。

4

1 回答 1

1

这令人难以置信。我不明白为什么。该节点应该被删除。

我不知道你为什么感到惊讶。您正在创建一个PersistentNode存在的节点,以便在节点被删除时自动重新创建节点。事实上,周围的所有代码都非常令人费解。它正在复制PersistentNode内部所做的事情。你不需要做所有其他的事情。只需使用PersistentNode.

此外,像 checkExists() 这样的代码后跟基于结果的操作几乎永远不会在生产中工作。ZooKeeper 是高度并发且最终一致的。这就是为什么您应该始终使用 Curator 的食谱而不是手动编码解决方案的原因。

于 2020-04-23T03:47:23.097 回答