1

我有多个线程正在写入同一个队列的场景。

Appender 线程接收来自不同市场(每个线程单个市场)的更新并将这些数据推送到同一个队列中:

ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path + "/market").build();
        final ExcerptTailer tailer = queue.createTailer();
appender.writeDocument(
                        wire -> {

                                wire
                                        .getValueOut().text("buy")
                                        .getValueOut().text(exchange.name())
                                        .getValueOut().text(currencyPair.toString())
                                        .getValueOut().dateTime(LocalDateTime.now(Clock.systemUTC()))
                                        .getValueOut().text(price);
                            });

然后我有完全独立的进程(不同的JVM)通过执行从队列中连续读取:

while (true){
     tailer.readDocument(........

但是当我每秒生成大约 10 次队列更新时,tailer 进程大约每 3 秒处理一条记录。我想我在这里遗漏了一些基本的东西:-)

或者持续监听队列更新的正确方法是什么?除了while (true) then do...之外,我找不到任何其他解决方案...

我正在 18 核机器(36 个线程)上开发并使用 Java Affinity 将每个工作分配给它自己的 CPU。

感谢您的任何提示。

4

1 回答 1

2

创建队列非常昂贵,如果可以,请尝试每个进程只执行一次。

创建 Tailer 也很昂贵,您应该创建一次并继续轮询更新。

创建对象可能很昂贵,我会避免创建任何对象。例如避免打电话toStringLocalDate.now

这是一个基准测试的例子

String path = OS.getTarget();
ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path + "/market").build();
ExcerptAppender appender = queue.acquireAppender();
Exchange exchange = Exchange.EBS;
CurrencyPair currencyPair = CurrencyPair.EURUSD;
double price = 1.2345;
for (int t = 0; t < 5; t++) {
    long start = System.nanoTime();
    int messages = 100000;
    for (int i = 0; i < messages; i++) {
        try (DocumentContext dc = appender.writingDocument()) {
            ValueOut valueOut = dc.wire().getValueOut();
            valueOut.text("buy")
                    .getValueOut().asEnum(exchange)
                    .getValueOut().asEnum(currencyPair)
                    .getValueOut().int64(System.currentTimeMillis())
                    .getValueOut().float64(price);
        }
    }
    long time = System.nanoTime() - start;
    System.out.printf("Throughput was %,d messages per second%n", (long) (messages * 1e9 / time));
    Jvm.pause(100);
}

印刷

Throughput was 962,942 messages per second
Throughput was 2,952,433 messages per second
Throughput was 4,776,337 messages per second
Throughput was 3,250,235 messages per second
Throughput was 3,514,863 messages per second

而对于阅读你可以做

final ExcerptTailer tailer = queue.createTailer();
for (int t = 0; t < 5; t++) {
    long start = System.nanoTime();
    int messages = 100000;
    for (int i = 0; i < messages; i++) {
        try (DocumentContext dc = tailer.readingDocument()) {
            if (!dc.isPresent())
                throw new AssertionError("Missing t: " + t + ", i: " + i);
            ValueIn in = dc.wire().getValueIn();
            String buy = in.text();
            Exchange exchange2 = in.asEnum(Exchange.class);
            CurrencyPair currencyPair2 = in.asEnum(CurrencyPair.class);
            long time = in.int64();
            double price2 = in.float64();
        }
    }
    long time = System.nanoTime() - start;
    System.out.printf("Read Throughput was %,d messages per second%n", (long) (messages * 1e9 / time));
}

注意:它读取与写入相同数量的消息。

印刷

Read Throughput was 477,849 messages per second
Read Throughput was 3,083,642 messages per second
Read Throughput was 5,100,516 messages per second
Read Throughput was 6,342,525 messages per second
Read Throughput was 6,672,971 messages per second
于 2018-01-09T19:21:48.717 回答