0

我有一个看起来像这样的子系统:

        [read]          [decode]       [deliver] 
Byte      -->  Undecoded  -->   Decoded   -->  Output queue
stream          message         message     

输入是一个套接字/字节流。第一步是阅读消息。下一步是解码消息(并将结果存储在消息对象中)。最后一步是传递消息。

我想并行化解码步骤,但我必须保持输出顺序与输入顺序相同。因此,如果收到消息 A 和 B 并且消息 B 的解码速度更快,我必须等到 A 完成才能传递它。

我在 Java 中做了一个简单的初始实现,但我的分析表明我在切换步骤(从“流读取器”到“解码器”以及从“解码器”到输出)丢失了太多。在 24 核计算机(包括超线程)上运行测试程序时,我得到:

  • 运行单线程实现时为 1100 K msg/s。
  • 运行简单的 12 线程实现(有很多队列)时为 110 K msg/s。

我的幼稚实现可在http://pastebin.com/be1JqZy3获得。它有超过 200 行代码,所以它可能只会对那些真正想知道如何使并行版本比串行版本慢 10 倍的人感兴趣(提示:开始看类 ThreadPoolDecoder)。

在执行此类问题时,是否有人可以使用模式/框架,其中工作继续(基于流)可以并行化但必须在输出时进行排序?

4

2 回答 2

2

1100 K msg/s 非常快(消息不到 1 微秒)。该时间与从队列中放入/获取消息的时间相当(0.1...1 微秒)。因此,为了利用并行化,您必须将不间断处理的时间大大超过 1 微秒(例如,1 毫秒)。如果您将小消息组合成较大的消息,则可以做到这一点。在一个数据包中累积1000条消息,并将该数据包作为一个工作单元进行处理。并行处理单元。

于 2012-04-16T15:51:16.367 回答
2

我在我编写的程序(在 C# 中)中处理这个问题的方法是在输出上有一个优先级队列。每条记录都有一个在读取时分配的关联记录号。这些数字从 0 开始并增加。当一个线程处理完一条记录后,它会将记录添加到优先级队列中。

一个单独的输出线程有一个从零开始的预期记录号。该线程监视队列,等待添加预期的记录号。添加预期记录后,线程将其从队列中删除,输出,增加预期记录编号,然后重试。

这在我的应用程序中非常有效,四个线程处理记录,一个处理输出。

于 2012-04-16T15:09:51.740 回答