12

我试图理解 和 之间的asyncBoundary区别mapAsync。乍一看,我想它们应该是一样的。但是,当我运行代码时,它的性能看起来asyncBoundarymapAsync

这是代码

implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()


Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()

输出:异步边界总是比 mayAsync 更快地完成。

从关于 asyncBoundary 的文档描述(https://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-flows-and-basics.html),我可以看到它正在运行在不同的 CPU 上,但 mapAsync 使用 Future 是多线程的。Future 也是异步的。

我可以要求更多关于这两个 API 的说明吗?

4

1 回答 1

14

异步

正如您正确指出的那样,这会强制在两个阶段之间插入异步边界。在你的例子中

Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()

这实际上意味着+ 1操作和* 2操作将由不同的参与者运行。这使得流水线化成为可能,当一个元素移动到* 2舞台上时,同时另一个元素可以被引入+ 1舞台。如果您不在那里强制设置异步边界,则同一参与者将按顺序执行操作并在一个元素上执行操作,然后再从上游请求一个新元素。

顺便说一句,您的示例可以使用async组合器以更短的格式重写:

Source(1 to 100).map(_ + 1).async.map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()

地图异步

这是一个并行执行异步操作的阶段。并行度因子允许您指定要启动以服务传入元素的并行参与者的最大数量。并行计算的结果由stage按顺序跟踪和发出。mapAsync

在你的例子中

Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()

可能多达 100 个+ 1操作(即所有操作)可以并行运行,并按顺序收集结果。随后,最多* 2可以并行运行 100 个操作,并且再次按顺序收集结果并发送到下游。

在您的示例中,您正在运行受 CPU 限制的快速操作,这些操作无法证明使用是合理mapAsync的,因为此阶段所需的基础设施很可能比并行运行 100 个这些操作的优势要昂贵得多。mapAsync在处理 IO-bound、缓慢的操作时特别有用,其中并行化非常方便。

如需全面了解此主题,请查看此博文

于 2017-11-15T09:55:57.247 回答