7

我最初在这里问了这个问题,但我意识到我的问题不是关于 while-true 循环。我想知道的是,在 Java 中进行高性能异步消息传递的正确方法是什么?

我正在尝试做的...

我有大约 10,000 个消费者,每个消费者都从他们的私人队列中消费消息。我有一个线程一一生成消息并将它们放入正确的消费者队列中。每个消费者无限循环,检查消息是否出现在其队列中并处理它。

我相信这个术语是“单一生产者/单一消费者”,因为有一个生产者,每个消费者只在他们的私有队列上工作(多个消费者从不从同一个队列中读取)。

Consumer.java 内部:

@Override
public void run() {
    while (true) {
        Message msg = messageQueue.poll();
        if (msg != null) {
            ... // do something with the message
        }
    }
}

生产者正在快速将消息放入消费者消息队列中(每秒数百万条消息)。消费者应该尽快处理这些消息!

注意:while (true) { ... }由 Producer 作为其最后一条消息发送的 KILL 消息终止。

但是,我的问题是关于设计此消息传递的正确方法。我应该为 messageQueue 使用哪种队列?它应该是同步的还是异步的?Message应该如何设计?我应该使用 while-true 循环吗?消费者应该是一个线程还是其他什么?10,000 个线程会慢到爬行吗?线程的替代品是什么?

那么,在 Java 中进行高性能消息传递的正确方法是什么?

4

5 回答 5

6

我会说 10,000 个线程的上下文切换开销会非常高,更不用说内存开销了。默认情况下,在 32 位平台上,每个线程使用 256kb 的默认堆栈大小,因此只有 2.5GB 用于您的堆栈。显然你说的是 64 位,但即便如此,那还是相当大的内存量。由于所使用的内存量,缓存将被大量使用,并且 cpu 将受到内存带宽的限制。

我会寻找一种避免使用这么多线程的设计,以避免分配大量的堆栈和上下文切换开销。您不能同时处理 10,000 个线程。当前的硬件通常具有少于 100 个内核。

我会为每个硬件线程创建一个队列并以循环方式发送消息。如果处理时间差异很大,则存在一些线程在获得更多工作之前完成处理队列的危险,而其他线程则永远无法完成分配的工作。这可以通过使用 JSR-166 ForkJoin 框架中实现的工作窃取来避免。

由于通信是从发布者到订阅者的一种方式,因此 Message 不需要任何特殊设计,假设订阅者在发布后不会更改消息。

编辑:阅读评论,如果您有 10,000 个符号,则创建一些通用订阅者线程(每个核心一个订阅者线程),它们异步接收来自发布者的消息(例如,通过他们的消息队列)。订阅者从队列中拉出消息,从消息中检索符号,并在消息处理程序的 Map 中查找它,检索处理程序,并调用处理程序以同步处理消息。完成后,它会重复,从队列中获取下一条消息。如果必须按顺序处理相同符号的消息(这就是我猜您想要 10,000 个队列的原因。),您需要将符号映射到订阅者。例如,如果有 10 个订阅者,则符号 0-999 到订阅者 0,1000-1999 到订阅者 1 等等。更精细的方案是根据符号的频率分布映射符号,以便每个用户获得大致相同的负载。例如,如果 10% 的流量是符号 0,那么订阅者 0 将只处理那个符号,而其他符号将分配给其他订阅者。

于 2010-07-29T22:29:06.960 回答
2

您可以使用它(归功于我应该使用 Java 中的哪个线程池?):

class Main {
    ExecutorService threadPool = Executors.newFixedThreadPool(
                                     Runtime.availableProcessors()*2);

    public static void main(String[] args){
        Set<Consumer> consumers = getConsumers(threadPool);
        for(Consumer consumer : consumers){
            threadPool.execute(consumer);
        }
    }
}

class Consumer {
    private final ExecutorService tp;
    private final MessageQueue messageQueue;
    Consumer(ExecutorService tp,MessageQueue queue){
        this.tp = tp;
        this.messageQueue = queue;
    }
    @Override
    public void run(){
              Message msg = messageQueue.poll();

              if (msg != null) {
                  try{
                       ... // do something with the message
                  finally{
                       this.tp.execute(this);
                  }
              }
           }
    }
}    

这样,您可以轻松安排好日程。

于 2010-07-29T23:45:59.393 回答
1

首先,除非您放置完整的设计文档或自己尝试不同的方法,否则没有一个正确的答案。

我假设您的处理不会是计算密集型的,否则您不会考虑同时处理 10000 个队列。一种可能的解决方案是通过每个 CPU 拥有一两个线程来最小化上下文切换。除非您的系统将严格实时处理数据,否则可能会给每个队列带来更大的延迟,但总体吞吐量会更高。

例如——让您的生产者线程在其自己的 CPU 上运行,并将批量消息发送到消费者线程。然后每个消费者线程将消息分发到它的 N 个私有队列,执行处理步骤,接收新的数据批次等等。同样,这取决于您的延迟容限,因此处理步骤可能意味着处理所有队列、固定数量的队列、尽可能多的队列,除非达到时间阈值。能够轻松分辨哪个队列属于哪个消费者线程(例如,如果队列按顺序编号:int consumerThreadNum = queueNum & 0x03)将是有益的,因为每次在哈希表中查找它们可能会很慢。

为了最大限度地减少内存抖动,始终创建/销毁队列可能不是一个好主意,因此您可能希望为每个线程预先分配一个(最大队列数/核心数)队列对象。当队列完成而不是被销毁时,它可以被清除和重用。您不希望 gc 经常和太长时间妨碍您。

另一个未知数是您的生产者是否为每个队列生成完整的数据集,或者会以块的形式发送数据,直到收到 KILL 命令。如果您的生产者发送完整的数据集,您可以完全取消队列概念,并在数据到达消费者线程时对其进行处理。

于 2010-07-29T23:00:37.100 回答
0

拥有一个相对于硬件和操作系统容量的消费者线程池。这些消费者线程可以轮询您的消息队列。

我要么让消息知道如何处理自己,要么在初始化时向消费者线程类注册处理器。

于 2010-07-29T22:33:11.503 回答
0

由于没有关于处理符号的约束的更多细节,很难给出非常具体的建议。

你应该看看这篇 slashdot 文章:

http://developers.slashdot.org/story/10/07/27/1925209/Java-IO-Faster-Than-NIO

它有很多关于多线程、单选和线程池参数的讨论和实际测量数据。

于 2010-07-29T23:06:02.843 回答