7

我有一个简单的程序:

import scalaz._
import stream._

object Play extends App {
  val in1 = io.linesR("C:/tmp/as.txt")
  val in2 = io.linesR("C:/tmp/bs.txt")

  val p = (in1 merge in2) to io.stdOutLines
  p.run.run
}

文件as.txt包含 5 个a,文件bs.txt包含 3 个b。我看到这种输出:

a
b
b
a
a
b
a
a
a

但是,当我更改in2如下声明时:

val in2 = io.stdInLines

然后我得到了我认为是意外的行为。根据文档1,程序应该根据哪个流更快地提供数据,从每个流中非确定性地提取数据。这应该意味着我看到一堆as 立即打印到控制台,但这根本不是发生的事情。

确实,在我按下 之前ENTER,什么都没有发生。很明显,如果我随机选择一个流来获取下一个元素,那么行为看起来很像我所期望的,然后,如果该流被阻塞,合并的进程也会阻塞(即使另一个流包含数据)。

到底是怎么回事?

1 - 好吧,文档很少,但Dan Spiewak在他的演讲中非常清楚地,它会抓住第一个提供数据的人

4

1 回答 1

6

问题在于stdInLines. 它是阻塞的,它从来不是Task.fork线程。

尝试将实现更改stdInLines为这个:

def stdInLines: Process[Task,String] =
    Process.repeatEval(Task.apply { 
    Option(scala.Console.readLine())
    .getOrElse(throw Cause.Terminated(Cause.End))
})

原来io.stdInLines的在同一个线程中运行readLine(),所以它总是在那里等待,直到你输入一些东西。

于 2014-11-18T21:24:48.080 回答