1

我正在使用编年史 4.5.27。

下面是使用 StoreFileListner 的 Writer 和 Reader 的简单实现。在阅读器中,我收到了多个 onAcquired 和 onReleased 事件。

为什么会发生这种情况?我预计只会收到一份 Acquire(获取文件以供读取时)和一份 Release(读取完成后)。

在 Reader 的以下日志中,一次可以看到多个 onAcquired 和 onReleased 事件。

请注意,此行为是随机的。另请注意 Writer 已通过 Jvm.pause 故意减慢速度,以模拟数据可能无法连续使用的真实系统。

import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;

import java.io.File;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class ChronicleFactory {
    public SingleChronicleQueue createChronicle(String instance, String persistenceDir, RollCycles rollCycles) {
        SingleChronicleQueue chronicle = null;
        try {
            chronicle = SingleChronicleQueueBuilder.binary(persistenceDir).rollCycle(rollCycles).storeFileListener(new StoreFileListener() {
                @Override
                public void onReleased(int i, File file) {
                    String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy‌​-MM-dd HH:mm:ss.SSS"));
                    System.out.println(currentTime + ": " + Thread.currentThread().getName() + " onReleased called for file: " + file.getAbsolutePath() + " for cycle: " + i);
                }
                @Override
                public void onAcquired(int cycle, File file) {
                    String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy‌​-MM-dd HH:mm:ss.SSS"));
                    System.out.println(currentTime + ": " + Thread.currentThread().getName() + " onAcquired called for file: " + file.getAbsolutePath() + " for cycle: " + cycle);
                }
            }).build();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return chronicle;
    }
}

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import org.apache.commons.lang3.RandomStringUtils;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;

public class MarketDataWriter {
    private static AtomicLong dataSeq = new AtomicLong();
    private static long longSequence = 0;
    private static int intSequence = 0;

    public static void main(String args[]) {
        String path = "C:\\Logs\\ChronicleData\\marketdata";
        writeMarketData(path);
    }

    private static void writeMarketData(String path) {
        ChronicleFactory chronicleFactory = new ChronicleFactory();
        SingleChronicleQueue chronicle = chronicleFactory.createChronicle("MD", path, RollCycles.MINUTELY);

        ExcerptAppender appender = chronicle.acquireAppender();

        while (true) {
            Jvm.pause(100); //NOTE: Slowing down writer to understand file rolling
            appender.writeBytes(b -> {
                b.writeLong(getLongSequence());
                b.writeInt(getIntSequence());
            });
        }
    }

    private static long getLongSequence() {
        return longSequence++;
    }

    private static int getIntSequence() {
        return intSequence++;
    }
}

import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimpleMarketDataReader {

    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public static void main(String args[]) {
        String pathForMarketData = "C:\\Logs\\ChronicleData\\marketdata";
        readMarketData(pathForMarketData);
    }

    public static void readMarketData(String pathForMarketDataFile) {
        ChronicleFactory chronicleFactory = new ChronicleFactory();
        SingleChronicleQueue chronicle = chronicleFactory.createChronicle("Reader", pathForMarketDataFile, RollCycles.MINUTELY);

        //Create another thread to read same file
        SimpleMarketDataReaderNewChronicle simpleMarketDataReaderNewChronicle = new SimpleMarketDataReaderNewChronicle();
        executor.submit(simpleMarketDataReaderNewChronicle);

        ExcerptTailer tailer = chronicle.createTailer();
        try {
            while (true) {
                tailer.readBytes(b -> {
                    b.readLong();
                    b.readInt();
                    //System.out.println("Long Sequence in SimpleMarketDataReader: " + b.readLong());
                    //System.out.println("User data is: " + userData);
                    //System.out.println("Int Sequence is: " + b.readInt());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

作家输出

2018‌​-01-03 09:36:00.079:主要 onAcquired 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536

2018‌​-01-03 09:37:00.098:主要 onReleased 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536

阅读器输出

2018‌​-01-03 09:36:00.065:主要 onAcquired 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536

2018‌​-01-03 09:36:00.075:主要 onReleased 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536

2018‌​-01-03 09:36:00.078:主要 onAcquired 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536

2018‌​-01-03 09:36:00.082:主要 onReleased 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536

2018‌​-01-03 09:36:00.086:主要 onAcquired 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536

2018‌​-01-03 09:37:00.103:主要 onReleased 调用文件:C:\Logs\ChronicleData\marketdata\20180103-0936.cq4for 周期:25249536

4

0 回答 0