在下面的代码中,重新启动 tailer 进程就可以了。但是,重新启动 appender 进程会导致 tailer 无法接收更多消息。有没有办法重新启动附加程序并保持通道打开?
已编辑:以下是我用来始终如一地重新创建问题的完整课程。环境:Ubuntu 18 Chronicle-queue-5.16.9.jar
1) java com.tradeplacer.util.IpcTest 生产者
2) java com.tradeplacer.util.IpcTest 消费者
3)杀死生产者
4)重启生产者
5)注意消费者不再阅读任何东西
package com.tradeplacer.util;
import java.nio.ByteBuffer;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueBuilder;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
public class IpcTest {
private static final String DIR = "chronicle-test";
public static final void startProducer() {
new Thread() {
public void run() {
System.out.println("starting producer...");
ChronicleQueue queue = ChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
ExcerptAppender appender = queue.acquireAppender();
ByteBuffer ipcBuffer = ByteBuffer.allocate(8192);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
ipcBuffer.clear();
ipcBuffer.put(("data" + i).getBytes());
Bytes<ByteBuffer> bbb = Bytes.wrapForWrite(ipcBuffer);
appender.writeBytes(bbb);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
public static final void startConsumer() {
new Thread() {
public void run() {
System.out.println("starting consumer...");
ChronicleQueue queue = ChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
ExcerptTailer tailer = queue.createTailer().toEnd(); // skip to end, don't read old messages
Bytes bytes = Bytes.allocateDirect(8192);
while (true) {
try {
long ipcIndex = tailer.index();
boolean read = tailer.readBytes(bytes);
int len = bytes.length();
byte[] data = new byte[len];
bytes.read(data);
if (read) {
System.out.println("read " + data);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}.start();
}
public static void main(final String[] args) {
if ("producer".equals(args[0]))
startProducer();
else
startConsumer();
}
}