0

我为此打开了一个问题:https ://github.com/OpenHFT/Chronicle-Queue/issues/534

我正在尝试使用 StoreFileListener 实现定期的旧队列文件清除逻辑。我正在使用最新版本的 net.openhft:chronicle-queue:5.16.13。我遇到的问题是:由于下一个循环处于活动状态,滚动循环文件滚动后,我删除了刚刚在 StoreFileListener 中发布的队列文件,然后我创建了一个新的 tailer 并尝试读取消息。它低于 NPE:

如果我创建一个指向相同队列目录的全新队列并创建一个tailer,也会发生同样的情况。

在尝试实现队列文件清除逻辑时看到下面的 NPE 异常:

java.lang.NullPointerException
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreTailer.inACycle(SingleChronicleQueueExcerpts.java:1198)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreTailer.readingDocument(SingleChronicleQueueExcerpts.java:1000)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreTailer.readingDocument(SingleChronicleQueueExcerpts.java:942)
at net.openhft.chronicle.wire.MarshallableIn.readText(MarshallableIn.java:95)
at com.test.edge.api.queue.TestDeleteQueueFile.testQueueFileDeletionWhileInUse(TestDeleteQueueFile.java:133)

要重现的测试用例如下:

package com.test.edge.api.queue;

import net.openhft.chronicle.core.time.SetTimeProvider;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TestDeleteQueueFile {

    private Path tempQueueDir;

    @Before
    public void setUp() throws Exception {
        tempQueueDir = Files.createTempDirectory("unitTestQueueDir");
    }

    @After
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory(tempQueueDir.toFile());
        Assert.assertFalse(tempQueueDir.toFile().exists());
        System.out.println("Deleted " + tempQueueDir.toFile().getAbsolutePath());
    }

    @Test
    public void testQueueFileDeletionWhileInUse() throws Exception {
        SetTimeProvider timeProvider = new SetTimeProvider();

        String queueName = "unitTestQueue";

        QueueStoreFileListener listener = new QueueStoreFileListener(queueName);

        try (ChronicleQueue queue = SingleChronicleQueueBuilder.binary(tempQueueDir + "/" + queueName).
                timeProvider(timeProvider).storeFileListener(listener)
                .build()) {

            ExcerptAppender appender = queue.acquireAppender();

            System.out.println("first index : " + queue.firstIndex());
            Assert.assertEquals(Long.MAX_VALUE, queue.firstIndex());

            //write 10 records should go to first day file
            for (int i = 0; i < 10; i++) {
                appender.writeText("test");
            }

            long indexAfter10Records = appender.lastIndexAppended();
            System.out.println("index after writing 10 records: " + indexAfter10Records);

            //roll to next day file
            timeProvider.advanceMillis(24 * 60 * 60 * 1000);


            //write 5 records in next file
            for (int i = 0; i < 5; i++) {
                appender.writeText("test2");
            }

            Map<String, List> queueToRollFilesOnAcquireMap = listener.getQueueToRollFilesOnAcquireMap();
            Map<String, List> queueToRollFilesOnReleaseMap = listener.getQueueToRollFilesOnReleaseMap();

            Assert.assertEquals(1, queueToRollFilesOnAcquireMap.size());
            List<String> files = queueToRollFilesOnAcquireMap.get(queueName);
            Assert.assertEquals(1, files.size());
            String secondFile = files.get(0);

            //other will have 1 as only first file is released
            files = queueToRollFilesOnReleaseMap.get(queueName);
            Assert.assertEquals(1, files.size());
            String firstFile = files.get(0);

            Assert.assertNotEquals(firstFile, secondFile);


            long firstIndex = queue.firstIndex();


            long indexAfter5Records = appender.lastIndexAppended();
            System.out.println("index after writing 5 records: " + indexAfter5Records);

            //now lets create one reader which will read all content
            ExcerptTailer excerptTailer = queue.createTailer();
            for (int i = 0; i < 10; i++) {
                Assert.assertEquals("test", excerptTailer.readText());
            }

            System.out.println("index after reading 10 records: " + excerptTailer.index());
            Assert.assertEquals(firstIndex, excerptTailer.index() - 10);
            for (int i = 0; i < 5; i++) {
                Assert.assertEquals("test2", excerptTailer.readText());
            }

            System.out.println("index after reading 5 records: " + excerptTailer.index());
            Assert.assertEquals(indexAfter5Records, excerptTailer.index() - 1);

            //lets delete first file
            System.out.println("Deleting first release file: " + firstFile);
            Files.delete(Paths.get(firstFile));


            long firstIndex3 = queue.firstIndex();
            Assert.assertEquals(firstIndex, firstIndex3);

            // and create a tailer it should only read
            //data in second file
            ExcerptTailer excerptTailer2 = queue.createTailer();
            System.out.println("index before reading 5: " + excerptTailer2.index());

            //AFTER CREATING A BRAND NEW TAILER, BELOW ASSERTION ALSO FAILS
            //WAS EXPECTING THAT TAILER CAN READ FROM START OF QUEUE BUT INDEX IS LONG.MAX

            //Assert.assertEquals(indexAfter5Records -5, excerptTailer2.index() -1);

            //BELOW THROWS NPE, WAS EXPECTING THAT WE CAN READ FROM SECOND DAILY QUEUE FILE
            System.out.println("excerptTailer2: " + excerptTailer2.peekDocument());
            for (int i = 0; i < 5; i++) {
                Assert.assertEquals("test2", excerptTailer2.readText());
            }

            //SAME ERROR WHEN CREATING A BRAND NEW QUEUE AND TRYING TO READ IT

//            // create brand new queue and read see how it behaves
//            ChronicleQueue queue2 =  SingleChronicleQueueBuilder.binary(tempQueueDir + "/" + queueName).
//                    timeProvider(timeProvider).storeFileListener(listener)
//                    .build();
//
//            long firstIndex2 = queue2.firstIndex();
//            Assert.assertNotEquals(firstIndex, firstIndex2);
//            Assert.assertNotEquals(indexAfter5Records, firstIndex2);
//
//            ExcerptTailer excerptTailer3 = queue.createTailer();
//            for (int i = 0; i < 5; i++) {
//                Assert.assertEquals("test2", excerptTailer3.readText());
//            }

        }

    }

    final class QueueStoreFileListener implements StoreFileListener {

        private String queueName;
        private Map<String, List> queueToRollFilesOnReleaseMap = new HashMap<>();
        private Map<String, List> queueToRollFilesOnAcquireMap = new HashMap<>();

        public QueueStoreFileListener(String queueName) {
            this.queueName = queueName;
        }

        @Override
        public void onReleased(int cycle, File file) {
            System.out.println("onReleased called cycle: " + cycle + "file: " + file);

            List<String> files = queueToRollFilesOnReleaseMap.get(queueName);
            if (files == null) {
                files = new ArrayList<>();
            }

            String fileAbsPath = file.getAbsolutePath();
            if (!files.contains(fileAbsPath)) {
                files.add(fileAbsPath);
            }
            queueToRollFilesOnReleaseMap.put(queueName, files);

            //update acquire file map
            List<String> acqfiles = queueToRollFilesOnAcquireMap.get(queueName);
            acqfiles.remove(file.getAbsolutePath());
            queueToRollFilesOnAcquireMap.put(queueName, acqfiles);

        }

        @Override
        public void onAcquired(int cycle, File file) {
            System.out.println("onAcquired called cycle: " + cycle + "file: " + file);

            List<String> files = queueToRollFilesOnAcquireMap.get(queueName);
            if (files == null) {
                files = new ArrayList<>();
            }

            String fileAbsPath = file.getAbsolutePath();
            if (!files.contains(fileAbsPath)) {
                files.add(fileAbsPath);
            }

            queueToRollFilesOnAcquireMap.put(queueName, files);


        }

        public Map<String, List> getQueueToRollFilesOnAcquireMap() {
            return queueToRollFilesOnAcquireMap;
        }

        public Map<String, List> getQueueToRollFilesOnReleaseMap() {
            return queueToRollFilesOnReleaseMap;
        }
    }
}
4

2 回答 2

0

在这一行

Assert.assertEquals(indexAfter5Records -5, excerptTailer2.index() -1);

indexAfter5Records

0x100000004

正如预期的那样。这是因为前 32 位保存的是周期数,1而低位保存的是周期数中的记录,即4从 开始的第 5 个记录0

excerptTailer2.index()应该从头开始。在我的 Windows 机器上,这是0因为在进程存在之前,Windows 并不总是允许您删除内存映射文件。在 Linux 上,这不是问题。

就我而言,由于无法删除文件,我使用了

excerptTailer2.moveToIndex(0x1_0000_0000L);

它会读取预期的短信。

最近有一个版本 readText/writeText 被破坏,但它应该在 5.16.15 中修复。

如果您正在寻找一个非常稳定的版本,我建议您使用 4.15.7。如果您想使用最新版本,我会选择第二天没有其他版本的版本;)

我们希望尽快使 5.16.x 成为我们的稳定版本,发布 6.17.x,其中将包括 Java 11 支持。

于 2018-09-29T18:58:26.723 回答
0

I was facing the same problem and I found a solution. Try to do refresh of directory listing before creating a tailer. E.g., you could add

queue.refreshDirectlyListing();

before

ExcerptTailer excerptTailer2 = queue.createTailer();

It will force to reread list of queue files. And also you can check implementation of this method to see, why listing is not refreshed by default:

    private void setFirstAndLastCycle() {
    long now = time.currentTimeMillis();
    if (now <= firstAndLastCycleTime) {
        return;
    }

    firstCycle = directoryListing.getMinCreatedCycle();
    lastCycle = directoryListing.getMaxCreatedCycle();

    firstAndLastCycleTime = now;
}
于 2019-01-11T15:31:53.440 回答