1

我正在实现一个线程池来处理大量市场数据馈送,并且对重用我的实现 runnable 的工作实例的策略有疑问,这些实例被提交到线程池执行。在我的情况下,我只有一种类型的工作人员,它接受一个字符串并对其进行解析以创建一个 Quote 对象,然后将其设置在正确的安全性上。鉴于来自提要的数据量,每秒可能处理超过 1,000 个报价,我看到了两种创建提交到线程池的工作线程的方法。

第一个选项是每次从底层套接字检索一行时简单地创建一个新的 Worker 实例,然后将其添加到线程池中,该线程池最终将在其 run 方法执行后被垃圾收集。但这让我开始思考性能问题,每秒实例化 1,0000 个 Worker 类的新实例真的有意义吗?本着与线程池相同的精神,人们是否知道拥有可运行池或队列是否是一种常见模式,以便我可以回收我的工作人员以避免对象创建和垃圾收集。我看到这个实现的方式是在返回 run() 方法之前,Worker 将自身添加回可用工作人员的队列中,然后在处理新的馈线时从该队列中提取,而不是创建新的 Worker 实例。

从性能的角度来看,采用第二种方法是否会有所收获,还是第一种方法更有意义?以前有人实施过这种模式吗?

谢谢 - 邓肯

4

5 回答 5

3

我会使用Executor并发包中的。我相信它会为你处理这一切。

于 2013-10-01T16:59:23.907 回答
3

为此,我使用了一个名为 Java Chronicle 的库。它旨在每秒保持和排队一百万个报价,而不会产生任何重大垃圾。

我在这里有一个演示,它以每秒 100 万条消息的速度发送带有纳秒计时信息的对象,它可以在 32 MB 堆的 JVM 中发送数千万条消息,而不会触发甚至是次要的收集。在我的超极本上,90% 的时间往返延迟小于 0.6 微秒。;)

从性能的角度来看,我通过第二种方法获得什么还是第一种更有意义?

我强烈建议不要用垃圾填充 CPU 缓存。事实上,我避免了任何会产生任何重大垃圾的构造。您可以构建一个系统,每个事件端到端创建少于一个对象。我有一个 Eden 大小,它比我一天产生的垃圾量还大,所以不用担心 GC 是次要的或满的。

以前有人实施过这种模式吗?

五年前,我用 Java 编写了一个可盈利的低延迟交易系统。当时它在 60 微秒的滴答声中足够快,可以用 Java 进行交易,但现在你可以做得比这更好。

如果你想要低延迟的市场数据处理系统,我就是这样做的。您可能会发现我在 JavaOne 上的演示文稿也很有趣。

http://www.slideshare.net/PeterLawrey/writing-and-testing-high-frequency-trading-engines-in-java


编辑我添加了这个解析示例

ByteBuffer wrap = ByteBuffer.allocate(1024);
ByteBufferBytes bufferBytes = new ByteBufferBytes(wrap);
byte[] bytes = "BAC,12.32,12.54,12.56,232443".getBytes();

int runs = 10000000;
long start = System.nanoTime();
for (int i = 0; i < runs; i++) {
    bufferBytes.reset();
    // read the next message.
    bufferBytes.write(bytes);
    bufferBytes.position(0);
    // decode message
    String word = bufferBytes.parseUTF(StopCharTesters.COMMA_STOP);
    double low = bufferBytes.parseDouble();
    double curr = bufferBytes.parseDouble();
    double high = bufferBytes.parseDouble();
    long sequence = bufferBytes.parseLong();
    if (i == 0) {
        assertEquals("BAC", word);
        assertEquals(12.32, low, 0.0);
        assertEquals(12.54, curr, 0.0);
        assertEquals(12.56, high, 0.0);
        assertEquals(232443, sequence);
    }
}
long time = System.nanoTime() - start;
System.out.println("Average time was " + time / runs + " nano-seconds");

当使用 -verbose:gc -Xmx32m 设置时,它会打印

Average time was 226 nano-seconds

注意:没有触发 GC。

于 2013-10-01T17:10:19.310 回答
1

每秒实例化 1,0000 个 Worker 类的新实例真的有意义吗?

不一定,但是您必须将Runnables 放入某种形式BlockingQueue以便能够重用,并且队列并发的成本可能超过 GC 开销。使用分析器或通过 Jconsole 观察 GC 编号将告诉您它是否在 GC 上花费了大量时间并且需要解决这个问题。

如果这确实是一个问题,另一种方法是将String您自己放入BlockingQueue并仅将对象提交Worker到线程池一次。每个Worker实例都会从 s 的队列中出列String并且永远不会退出。就像是:

public void run() {
    while (!shutdown) {
        String value = myQueue.take();
        ...
    }
}

因此,您无需Worker每秒创建 1000 秒。

于 2013-10-01T17:04:48.150 回答
1

是的,当然是这样的,因为 OS 和 JVM 不关心线程上发生了什么,所以通常这是重用可回收对象的好习惯。

于 2013-10-01T18:25:13.513 回答
0

我在您的问题中看到两个问题。一个是关于线程池的,另一个是关于对象池的。对于您的线程池问题,Java 提供了ExecutorService。下面是使用 ExecutorService 的示例。

Runnable r = new Runnable() {
    public void run() {
        //Do some work
    }
};

// Thread pool of size 2
ExecutorService executor = Executors.newFixedThreadPool(2);
// Add the runnables to the executor service
executor.execute(r);

ExecutorService 提供了许多不同类型的具有不同行为的线程池。

就对象池而言,(每秒创建 1000 个对象是否有意义,然后将它们留给垃圾收集,这一切都取决于对象的状态和费用。如果您担心自己的状态工作线程受到损害,您可以查看使用享元模式将您的状态封装在工作程序之外。此外,如果您要遵循享元模式,您还可以查看FutureCallable对象在您的应用程序架构中的用处.

于 2013-10-01T17:06:28.097 回答