我正在使用编年史 4.5.27。
下面是使用 StoreFileListner 的 Writer 和 Reader 的简单实现。在阅读器中,我收到了多个 onAcquired 和 onReleased 事件。
为什么会发生这种情况?我预计只会收到一份 Acquire(获取文件以供读取时)和一份 Release(读取完成后)。
在 Reader 的以下日志中,一次可以看到多个 onAcquired 和 onReleased 事件。
请注意,此行为是随机的。另请注意 Writer 已通过 Jvm.pause 故意减慢速度,以模拟数据可能无法连续使用的真实系统。
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import java.io.File;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class ChronicleFactory {
public SingleChronicleQueue createChronicle(String instance, String persistenceDir, RollCycles rollCycles) {
SingleChronicleQueue chronicle = null;
try {
chronicle = SingleChronicleQueueBuilder.binary(persistenceDir).rollCycle(rollCycles).storeFileListener(new StoreFileListener() {
@Override
public void onReleased(int i, File file) {
String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
System.out.println(currentTime + ": " + Thread.currentThread().getName() + " onReleased called for file: " + file.getAbsolutePath() + " for cycle: " + i);
}
@Override
public void onAcquired(int cycle, File file) {
String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
System.out.println(currentTime + ": " + Thread.currentThread().getName() + " onAcquired called for file: " + file.getAbsolutePath() + " for cycle: " + cycle);
}
}).build();
} catch (Exception e) {
e.printStackTrace();
}
return chronicle;
}
}
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import org.apache.commons.lang3.RandomStringUtils;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
public class MarketDataWriter {
private static AtomicLong dataSeq = new AtomicLong();
private static long longSequence = 0;
private static int intSequence = 0;
public static void main(String args[]) {
String path = "C:\\Logs\\ChronicleData\\marketdata";
writeMarketData(path);
}
private static void writeMarketData(String path) {
ChronicleFactory chronicleFactory = new ChronicleFactory();
SingleChronicleQueue chronicle = chronicleFactory.createChronicle("MD", path, RollCycles.MINUTELY);
ExcerptAppender appender = chronicle.acquireAppender();
while (true) {
Jvm.pause(100); //NOTE: Slowing down writer to understand file rolling
appender.writeBytes(b -> {
b.writeLong(getLongSequence());
b.writeInt(getIntSequence());
});
}
}
private static long getLongSequence() {
return longSequence++;
}
private static int getIntSequence() {
return intSequence++;
}
}
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SimpleMarketDataReader {
private static final ExecutorService executor = Executors.newCachedThreadPool();
public static void main(String args[]) {
String pathForMarketData = "C:\\Logs\\ChronicleData\\marketdata";
readMarketData(pathForMarketData);
}
public static void readMarketData(String pathForMarketDataFile) {
ChronicleFactory chronicleFactory = new ChronicleFactory();
SingleChronicleQueue chronicle = chronicleFactory.createChronicle("Reader", pathForMarketDataFile, RollCycles.MINUTELY);
//Create another thread to read same file
SimpleMarketDataReaderNewChronicle simpleMarketDataReaderNewChronicle = new SimpleMarketDataReaderNewChronicle();
executor.submit(simpleMarketDataReaderNewChronicle);
ExcerptTailer tailer = chronicle.createTailer();
try {
while (true) {
tailer.readBytes(b -> {
b.readLong();
b.readInt();
//System.out.println("Long Sequence in SimpleMarketDataReader: " + b.readLong());
//System.out.println("User data is: " + userData);
//System.out.println("Int Sequence is: " + b.readInt());
});
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
作家输出:
2018-01-03 09:36:00.079:主要 onAcquired 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536
2018-01-03 09:37:00.098:主要 onReleased 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536
阅读器输出:
2018-01-03 09:36:00.065:主要 onAcquired 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536
2018-01-03 09:36:00.075:主要 onReleased 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536
2018-01-03 09:36:00.078:主要 onAcquired 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536
2018-01-03 09:36:00.082:主要 onReleased 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536
2018-01-03 09:36:00.086:主要 onAcquired 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536
2018-01-03 09:37:00.103:主要 onReleased 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536