我需要做一些与此非常相似的事情https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala
我的问题是我有一个未知数量的组,如果 mapAsync 的并行数少于我得到的组数和最后一个接收器中的错误
由于上游错误(akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon$2)而拆除 SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt)
我尝试按照 akka 流的模式指南http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html中的建议在中间放置一个缓冲区
groupBy {
case LoglevelPattern(level) => level
case other => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
// write lines of each group to a separate file
mapAsync(parallelism = 2) {....
但结果相同