3

出于某种原因,我的 Akka 流总是在“发出”(?)第一条消息之前等待第二条消息。

这是一些演示我的问题的示例代码。

val rx = Source((1 to 100).toStream.map { t =>
  Thread.sleep(1000)
  println(s"doing $t")
  t
})
rx.runForeach(println)

产生输出:

doing 1
doing 2
1
doing 3
2
doing 4
3
doing 5
4
doing 6
5
...

我想要的是:

doing 1
1
doing 2
2
doing 3
3
doing 4
4
doing 5
5
doing 6
6
...
4

2 回答 2

5

您的代码现在设置的方式,您Source在允许开始向下游发射元素之前完全转换。toStream通过删除代表源的数字范围,您可以清楚地看到这种行为(如@slouc 所述) 。如果你这样做,你会看到Source它在开始响应下游需求之前首先完全转变。如果您确实想将 a 运行Source到 aSink并在中间进行转换步骤,那么您可以尝试构建如下内容:

val transform =
  Flow[Int].map{ t =>
    Thread.sleep(1000)
    println(s"doing $t")
    t
  }

Source((1 to 100).toStream).
  via(transform ).
  to(Sink.foreach(println)).
  run

如果您进行该更改,那么您将获得所需的效果,即在开始处理下一个元素之前,流向下游的元素一直通过流进行处理。

于 2016-07-10T19:06:56.550 回答
1

您正在使用.toStream()这意味着整个集合都是惰性的。没有它,您的输出将首先是一百个“doing”,然后是从 1 到 100 的数字。但是,Stream只计算第一个元素,它给出“doing 1”输出,这是它停止的地方。将在需要时评估下一个元素。

现在,我在文档中找不到关于此的任何详细信息,但我认为runForeach有一个实现,它在调用当前元素上的函数之前采用下一个元素。因此,在调用println元素 n 之前,它首先检查元素 n+1(例如检查它是否存在),这会导致“正在执行 n+1”消息。然后它println在当前元素上执行您的功能,从而产生消息 "n" 。

你真的需要map()在你面前吗runForeach?我的意思是,你需要两次遍历数据吗?我知道我可能是在说显而易见的,但如果你只是像这样一次性处理你的数据:

val rx = Source((1 to 100).toStream)
rx.runForeach({ t =>
  Thread.sleep(1000)
  println(s"doing $t")
  // do something with 't', which is now equal to what "doing" says
})

那么你就不会有什么时候评估的问题。

于 2016-07-10T18:32:38.720 回答