7

我需要生成 N 个消费者线程,它们同时处理相同的 InputStream,例如 - 以某种方式对其进行转换、计算校验和或数字签名等。这些消费者不相互依赖,并且它们都使用第三方库,这些库接受 InputStream 作为数据来源。

所以我能做的是 - 创建一些 InputStream 的实现,这将

  • 从“父”流中读取数据块
  • 解锁消费者
  • 等到每个消费者都读完整个块
  • 读取下一个块

虽然看起来很简单,但它可能会引发各种问题,例如某些消费者死亡时的活锁、实现所有 InputStream 方法、使用屏障/闩锁控制消费者自己的 fork/join 等。

一位哥们告诉我,实施需要半个小时,这让我度过了一个晚上。

我更喜欢使用足够成熟的东西(谷歌搜索没有结果,因此我的 google-fu 不够好?)或者不打扰并将整个“源”流复制到临时文件中并将其用作数据来源。后一种解决方案似乎更可靠,但最终可能会创建千兆字节文件(例如在处理流式音频时)。

4

3 回答 3

3

在我看来,您至少应该有某种缓冲,以便不同的消费者可以以不同的速度在流中移动,而不会一直被当前最慢的消费者拖累。这基本上确保了最坏情况下的性能和很少的并发收益。

例如,您可以用迄今为止使用过的消费者标记每个块,然后删除完全用完的消费者。也许这可以通过每个消费者持有对它尚未使用的每个块的引用来实现,这将允许 GC 自动处理已使用的块。生产者可能会WeakReference为这些块保留一个 s 列表,以便它可以处理尚未使用的块数量,并以此为基础进行节流。

我也在考虑InputStream每个线程有一个单独的实例,它在内部与生产者通信InputStream。这样,您就可以轻松解决活锁风险:try ... finally { is.close(); }- 垂死的消费者关闭自己的输入流。这将传达给生产者。

我对使用ArrayBlockingQueue每个消费者有一些想法。在不让生产者阻塞或忙等待的情况下,确保所有消费者都得到适当的喂食会有一些困难。

于 2012-07-04T20:26:04.467 回答
0

您是否考虑过使用管道流?您的生产者可以有一个或多个PipedOuputStream,它会在其中抛出从文件中读取的任何内容。在管道的另一端,您有不同的使用者线程读取相应的PipedInputstream(这是一个可以与您的库共享的 InputStream)。

您的生产者线程可以决定应该通过哪个管道发送数据,通过这种方式,为管道另一侧的给定消费者线程读取提供要处理的数据。

如果您需要从消费者线程中取回数据,那么您可以创建另一个管道,以相反的方向将数据发回给您。

于 2012-07-04T20:40:50.970 回答
0

您可以尝试一些 Java 消息服务 (JMS) 实现,例如Apache ActiveMQ

在您的情况下,您需要创建一个所谓的主题(请参阅主题与队列)。一个主题由生产者创建,并发布给 N 个消费者,这些消费者可以同时运行,每个消费者接收完全相同的数据。

由于您想使用InputStreams ,因此有一章关于如何发送消息是流

我想,通常情况下,生产者和消费者将是独立的进程,可能在网络上的不同机器上运行。不过,我认为您可以将其配置为完全在单个 JVM 中运行。这将取决于 JMS 的实现。这些也很有名:JBoss 的 HornetQRabbitMQ和一大堆其他人。

于 2012-07-06T15:35:19.477 回答