我使用 CQ v5.16.11(带有 openjdk 11)来保存每日滚动周期的数据。这个过程从周日到周五不间断地运行,所以我每周有 5 个 cq4 文件。我运行了 1.5 周,有 8 个文件(第 1 周 3 个,第 2 周 5 个)。
所以我拥有的文件是:
20181003.cq4 cycle=17807,
20181004.cq4 cycle=17808,
20181005.cq4 cycle=17809,
20181007.cq4 cycle=17811,
20181008.cq4 cycle=17812,
20181009.cq4 cycle=17813,
20181010.cq4 cycle=17814,
20181011.cq4 cycle=17815,
请注意 20181006.cq4 (cycle=17810) 的缺失文件,因为该过程在星期六不运行。
我使用此代码读取数据:
tailer.toEnd();
lastTailerIndex = tailer.index();
tailer.toStart();
while (tailer.index() <= lastTailerIndex) {
// read data
if (tailer.readBytes(data) {
/// do something with data bytes
}
if (tailer.index() == lastTailerIndex) {
break;
}
}
这会正确读取第一周的数据,但不会读取第二周的数据,因为它不会自动滚动到下一个周期。
知道为什么会发生这种情况或如何解决这个问题吗?
这个问题与旧版本的问题类似
日志:
2018-10-12 12:41:15,784 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/metadata.cq4t took 19.237 ms.
2018-10-12 12:41:15,876 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181011.cq4 took 0.063 ms.
2018-10-12 12:41:15,881 DEBUG [main] net.openhft.chronicle.queue.impl.single.PretoucherState - /site/data/20181011.cq4 - Reset pretoucher to pos 4835096 as the underlying MappedBytes changed.
2018-10-12 12:41:15,887 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181003.cq4 took 0.065 ms.
2018-10-12 12:41:15,995 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181011.cq4 took 0.082 ms.
2018-10-12 12:41:15,996 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181003.cq4
2018-10-12 12:41:15,997 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181011.cq4
2018-10-12 12:41:16,418 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181004.cq4 took 0.112 ms.
2018-10-12 12:41:16,418 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181003.cq4
2018-10-12 12:41:16,813 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181005.cq4 took 0.084 ms.
2018-10-12 12:41:16,813 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181004.cq4
[编辑 1]:
上周末也发生了同样的事情,即,正如预期的那样,10 月 13 日没有新文件。现在我有 10 月 7 日到 10 月 15 日的文件(缺少 10 月 13 日的文件)。如果我这样做tailer.toStart(); while(tailer.readBytes() { ...}
,它只会从 10 月 7 日到 10 月 12 日读取文件,而不会读取 10 月 14 日和 15 日的文件。
[编辑 2]:将问题复制如下 Chronicle-Queue/issues/537
- 设置/库: jvm openjdk 11、Ubuntu 16.04、openhft.affinity/3.1.9、chronicle-map/3.16.0、chronicle-queue/5.16.11、chronicle-bytes/1.16.23、chronicle-core/1.16.20 ,编年史线/1.16.16,编年史线程/1.16.3,jna/4.4.0
- 脚步:
- 启动 WriterProcess - 让它完成。
- 启动 ReaderProcess - 查看 5 个打印语句。
- 停止 ReaderProcess
- 等待一段时间 - 10 分钟。
- 再次启动 WriterProcess - 让它完成或继续运行这个过程。
- 启动 ReaderProcess - 它只打印前 5 个打印语句,此后不打印任何内容。即使 WriterProcess 正在运行/写入以排队,该进程中的尾部也不会向前移动。
public class WriterProcess {
public static void main(String[] args) throws InterruptedException {
final String dir = "/tmp/demo/";
final LocalTime localTime = LocalTime.of(17, 0);
final ZoneId zoneID = ZoneId.of("America/New_York");
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir)
.blockSize((long) Math.pow(2, 23))
.rollCycle(RollCycles.MINUTELY)
.rollTime(localTime, zoneID)
.build();
final ExcerptAppender appender = queue.acquireAppender();
// pre touch
scheduledExecutorService.scheduleAtFixedRate(appender::pretouch,0,30, TimeUnit.SECONDS);
// write data
System.out.println("writing data ...");
writeData(appender, 5);
// close queue
System.out.println("shutting down now ...");
queue.close();
scheduledExecutorService.shutdown();
scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
}
public static void writeData(ExcerptAppender appender, int count) {
int ctr = 0;
String dateStr;
Date date = new Date();
while (true) {
dateStr = date.toString();
appender.writeText("["+ctr+"] Written " + dateStr);
System.out.println("["+ctr+"] Written " + dateStr);
ctr++;
if (ctr >= count) {
break;
}
try {
Thread.sleep(65_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ReaderProcess {
public static void main(String[] args) {
final String dir = "/tmp/demo/";
final LocalTime localTime = LocalTime.of(17, 0);
final ZoneId zoneID = ZoneId.of("America/New_York");
final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir)
.blockSize((long) Math.pow(2, 23))
.rollCycle(RollCycles.MINUTELY)
.rollTime(localTime, zoneID)
.build();
final ExcerptTailer tailer = queue.createTailer();
tailer.toStart();
// read data
System.out.println("reading data ...");
readData(tailer, 25);
// close
System.out.println("shutting down now ...");
queue.close();
}
public static void readData(ExcerptTailer tailer, int count) {
int ctr = 0;
Bytes data = Bytes.allocateDirect(new byte[500]);
while (true) {
if (tailer.readBytes(data)) {
System.out.println("["+ctr+"] Read {"+ data + "}");
ctr++;
if (ctr >= count) {
break;
}
}
}
}
}