我想将最新的与 Akka Streams 结合起来,如此处所述。
我不知道该怎么做-请帮忙!
谢谢,瑞恩。
我只是快速实施它。不确定它是否没有错误,但值得一试:) https://gist.github.com/tg44/2e75d45c234ca02d91cfdac35f41a5a2 欢迎在要点下发表评论!
正如我们在 gitter 频道上所说,它无法通过构建阶段实现,但您可以使用自定义阶段编写功能。您将需要两个输入和一个输出(可以扩展到 N 个输入),因此它是一个扇形。
我将传入的元素保存到选项中,并且只要输入准备好(也就是发送元素),我就会将给定的元素保存到选项中。每当输出需要一个元素(并且我们已经从两个输入中都有一个元素)时,我将选项中的值作为元组提供给它。这是背压感知方法。
对于背压方法(生成所有对),您需要处理等待“其他”输出元素然后是最后一个,并且需要处理输入拉动。我认为我的实现仍然无法处理速度过快的生产者和缓慢的消费者情况(我们可能会错过一个元素,可以使用 emits 处理),并且如果两个输入多次产生相同的元素(也许 emits 也可以处理),则可能会死锁。
如果您想扩展我的代码功能或想编写其他自定义阶段,请阅读:http ://doc.akka.io/docs/akka/2.5/scala/stream/stream-customize.html