我为此打开了一个问题:https ://github.com/OpenHFT/Chronicle-Queue/issues/534
我正在尝试使用 StoreFileListener 实现定期的旧队列文件清除逻辑。我正在使用最新版本的 net.openhft:chronicle-queue:5.16.13。我遇到的问题是:由于下一个循环处于活动状态,滚动循环文件滚动后,我删除了刚刚在 StoreFileListener 中发布的队列文件,然后我创建了一个新的 tailer 并尝试读取消息。它低于 NPE:
如果我创建一个指向相同队列目录的全新队列并创建一个tailer,也会发生同样的情况。
在尝试实现队列文件清除逻辑时看到下面的 NPE 异常:
java.lang.NullPointerException
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreTailer.inACycle(SingleChronicleQueueExcerpts.java:1198)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreTailer.readingDocument(SingleChronicleQueueExcerpts.java:1000)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreTailer.readingDocument(SingleChronicleQueueExcerpts.java:942)
at net.openhft.chronicle.wire.MarshallableIn.readText(MarshallableIn.java:95)
at com.test.edge.api.queue.TestDeleteQueueFile.testQueueFileDeletionWhileInUse(TestDeleteQueueFile.java:133)
要重现的测试用例如下:
package com.test.edge.api.queue;
import net.openhft.chronicle.core.time.SetTimeProvider;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TestDeleteQueueFile {
private Path tempQueueDir;
@Before
public void setUp() throws Exception {
tempQueueDir = Files.createTempDirectory("unitTestQueueDir");
}
@After
public void tearDown() throws Exception {
FileUtils.deleteDirectory(tempQueueDir.toFile());
Assert.assertFalse(tempQueueDir.toFile().exists());
System.out.println("Deleted " + tempQueueDir.toFile().getAbsolutePath());
}
@Test
public void testQueueFileDeletionWhileInUse() throws Exception {
SetTimeProvider timeProvider = new SetTimeProvider();
String queueName = "unitTestQueue";
QueueStoreFileListener listener = new QueueStoreFileListener(queueName);
try (ChronicleQueue queue = SingleChronicleQueueBuilder.binary(tempQueueDir + "/" + queueName).
timeProvider(timeProvider).storeFileListener(listener)
.build()) {
ExcerptAppender appender = queue.acquireAppender();
System.out.println("first index : " + queue.firstIndex());
Assert.assertEquals(Long.MAX_VALUE, queue.firstIndex());
//write 10 records should go to first day file
for (int i = 0; i < 10; i++) {
appender.writeText("test");
}
long indexAfter10Records = appender.lastIndexAppended();
System.out.println("index after writing 10 records: " + indexAfter10Records);
//roll to next day file
timeProvider.advanceMillis(24 * 60 * 60 * 1000);
//write 5 records in next file
for (int i = 0; i < 5; i++) {
appender.writeText("test2");
}
Map<String, List> queueToRollFilesOnAcquireMap = listener.getQueueToRollFilesOnAcquireMap();
Map<String, List> queueToRollFilesOnReleaseMap = listener.getQueueToRollFilesOnReleaseMap();
Assert.assertEquals(1, queueToRollFilesOnAcquireMap.size());
List<String> files = queueToRollFilesOnAcquireMap.get(queueName);
Assert.assertEquals(1, files.size());
String secondFile = files.get(0);
//other will have 1 as only first file is released
files = queueToRollFilesOnReleaseMap.get(queueName);
Assert.assertEquals(1, files.size());
String firstFile = files.get(0);
Assert.assertNotEquals(firstFile, secondFile);
long firstIndex = queue.firstIndex();
long indexAfter5Records = appender.lastIndexAppended();
System.out.println("index after writing 5 records: " + indexAfter5Records);
//now lets create one reader which will read all content
ExcerptTailer excerptTailer = queue.createTailer();
for (int i = 0; i < 10; i++) {
Assert.assertEquals("test", excerptTailer.readText());
}
System.out.println("index after reading 10 records: " + excerptTailer.index());
Assert.assertEquals(firstIndex, excerptTailer.index() - 10);
for (int i = 0; i < 5; i++) {
Assert.assertEquals("test2", excerptTailer.readText());
}
System.out.println("index after reading 5 records: " + excerptTailer.index());
Assert.assertEquals(indexAfter5Records, excerptTailer.index() - 1);
//lets delete first file
System.out.println("Deleting first release file: " + firstFile);
Files.delete(Paths.get(firstFile));
long firstIndex3 = queue.firstIndex();
Assert.assertEquals(firstIndex, firstIndex3);
// and create a tailer it should only read
//data in second file
ExcerptTailer excerptTailer2 = queue.createTailer();
System.out.println("index before reading 5: " + excerptTailer2.index());
//AFTER CREATING A BRAND NEW TAILER, BELOW ASSERTION ALSO FAILS
//WAS EXPECTING THAT TAILER CAN READ FROM START OF QUEUE BUT INDEX IS LONG.MAX
//Assert.assertEquals(indexAfter5Records -5, excerptTailer2.index() -1);
//BELOW THROWS NPE, WAS EXPECTING THAT WE CAN READ FROM SECOND DAILY QUEUE FILE
System.out.println("excerptTailer2: " + excerptTailer2.peekDocument());
for (int i = 0; i < 5; i++) {
Assert.assertEquals("test2", excerptTailer2.readText());
}
//SAME ERROR WHEN CREATING A BRAND NEW QUEUE AND TRYING TO READ IT
// // create brand new queue and read see how it behaves
// ChronicleQueue queue2 = SingleChronicleQueueBuilder.binary(tempQueueDir + "/" + queueName).
// timeProvider(timeProvider).storeFileListener(listener)
// .build();
//
// long firstIndex2 = queue2.firstIndex();
// Assert.assertNotEquals(firstIndex, firstIndex2);
// Assert.assertNotEquals(indexAfter5Records, firstIndex2);
//
// ExcerptTailer excerptTailer3 = queue.createTailer();
// for (int i = 0; i < 5; i++) {
// Assert.assertEquals("test2", excerptTailer3.readText());
// }
}
}
final class QueueStoreFileListener implements StoreFileListener {
private String queueName;
private Map<String, List> queueToRollFilesOnReleaseMap = new HashMap<>();
private Map<String, List> queueToRollFilesOnAcquireMap = new HashMap<>();
public QueueStoreFileListener(String queueName) {
this.queueName = queueName;
}
@Override
public void onReleased(int cycle, File file) {
System.out.println("onReleased called cycle: " + cycle + "file: " + file);
List<String> files = queueToRollFilesOnReleaseMap.get(queueName);
if (files == null) {
files = new ArrayList<>();
}
String fileAbsPath = file.getAbsolutePath();
if (!files.contains(fileAbsPath)) {
files.add(fileAbsPath);
}
queueToRollFilesOnReleaseMap.put(queueName, files);
//update acquire file map
List<String> acqfiles = queueToRollFilesOnAcquireMap.get(queueName);
acqfiles.remove(file.getAbsolutePath());
queueToRollFilesOnAcquireMap.put(queueName, acqfiles);
}
@Override
public void onAcquired(int cycle, File file) {
System.out.println("onAcquired called cycle: " + cycle + "file: " + file);
List<String> files = queueToRollFilesOnAcquireMap.get(queueName);
if (files == null) {
files = new ArrayList<>();
}
String fileAbsPath = file.getAbsolutePath();
if (!files.contains(fileAbsPath)) {
files.add(fileAbsPath);
}
queueToRollFilesOnAcquireMap.put(queueName, files);
}
public Map<String, List> getQueueToRollFilesOnAcquireMap() {
return queueToRollFilesOnAcquireMap;
}
public Map<String, List> getQueueToRollFilesOnReleaseMap() {
return queueToRollFilesOnReleaseMap;
}
}
}