5

我需要做一些与此非常相似的事情https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala

我的问题是我有一个未知数量的组,如果 mapAsync 的并行数少于我得到的组数和最后一个接收器中的错误

由于上游错误(akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon$2)而拆除 SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt)

我尝试按照 akka 流的模式指南http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html中的建议在中间放置一个缓冲区

groupBy {
  case LoglevelPattern(level) => level
  case other                  => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
  // write lines of each group to a separate file
  mapAsync(parallelism = 2) {....

但结果相同

4

1 回答 1

4

扩展 jrudolph 的评论,这是完全正确的......

mapAsync在这种情况下,您不需要 a 。作为一个基本示例,假设您有一个元组源

import akka.stream.scaladsl.{Source, Sink}

def data() = List(("foo", 1),
                  ("foo", 2),
                  ("bar", 1),
                  ("foo", 3),
                  ("bar", 2))

val originalSource = Source(data)

然后您可以执行 groupBy 以创建Source of Sources

def getID(tuple : (String, Int)) = tuple._1

//a Source of (String, Source[(String, Int),_])
val groupedSource = originalSource groupBy getID

每个分组的 Source 都可以只用一个 并行处理map,不需要任何花哨的东西。以下是每个分组在独立流中相加的示例:

import akka.actor.ActorSystem
import akka.stream.ACtorMaterializer

implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher

def getValues(tuple : (String, Int)) = tuple._2

//does not have to be a def, we can re-use the same sink over-and-over
val sumSink = Sink.fold[Int,Int](0)(_ + _)

//a Source of (String, Future[Int])
val sumSource  = 
  groupedSource map { case (id, src) => 
    id -> {src map getValues runWith sumSink} //calculate sum in independent stream
  }

现在所有"foo"数字都与所有"bar"数字并行相加。

mapAsync当您有一个返回 a 的封装函数Future[T]并且您尝试发出 a 时T使用;在你的问题中情况并非如此。此外, mapAsync 涉及等待反应性的结果......

于 2015-11-27T14:30:08.040 回答