1

这一定很简单。但由于某种原因,我无法让它工作。

  • 如果我这样做io.linesR(...),我有一个文件行流,没关系。
  • 如果我这样做Processor.emitAll(),我有一个预定义的值流。它也有效。

但我真正需要的是异步生成 scalaz-stream 的值(嗯,来自 Akka 演员)。

我努力了:

  • async.unboundedQueue[String]
  • async.signal[String]

然后调用queue.enqueueOne(...).runorsignal.set(...).run并听queue.dequeueor signal.discrete。只是.map.to。有一个例子证明可以使用另一种流 - 使用处理器或文件中的行。

秘诀是什么?创建稍后流式传输的频道的首选方法是什么?如何用来自另一个上下文的值来喂养它?

谢谢!

4

1 回答 1

0

如果值是异步生成的,但可以从流中驱动,我发现使用“原始”await方法并“手动”构建过程是最简单的。您需要一个间接递归函数:

def processStep(v: Int): Process[Future, Int] =
  Process.emit(v) ++ Process.await(myActor ? NextValuePlease())(w => processStep(w))

但是如果你需要一个真正的异步进程,从其他地方驱动,我从来没有这样做过。

于 2014-11-19T23:15:18.170 回答