2

我正在尝试使用具有昂贵操作的 scalaz-stream 处理数据流※。

scala> :paste
// Entering paste mode (ctrl-D to finish)

    def expensive[T](x:T): T = {
      println(s"EXPENSIVE! $x")
      x
    }
    ^D
// Exiting paste mode, now interpreting.

expensive: [T](x: T)T

※是的,是的,我知道在代码中混入副作用是不好的函数式编程风格。打印语句只是跟踪调用昂贵()的次数。)

在将数据传递给昂贵的操作之前,我首先需要将其拆分成块。

scala> val chunked: Process[Task,Vector[Int]] = Process.range(0,4).chunk(2)
chunked: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await(scalaz.concurrent.Task@7ef516f3,<function1>,Emit(SeqView(...),Halt(scalaz.stream.Process$End$)),Emit(SeqView(...),Halt(scalaz.stream.Process$End$)))

scala> chunked.runLog.run
res1: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector())

然后我将昂贵的操作映射到块流上。

scala> val processed = chunked.map(expensive)
processed: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await(scalaz.concurrent.Task@7ef516f3,<function1>,Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$)),Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$)))

当我执行此操作时,它会按预期的次数调用昂贵的():

scala> processed.runLog.run
EXPENSIVE! Vector(0, 1)
EXPENSIVE! Vector(2, 3)
EXPENSIVE! Vector()
res2: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector())

但是,如果我链接到 zipWithIndex 的调用,昂贵的()会被调用很多次:

>scala processed.zipWithIndex.runLog.run
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0, 1)
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2, 3)
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
res3: scala.collection.immutable.IndexedSeq[(Vector[Int], Int)] = Vector((Vector(0, 1),0), (Vector(2, 3),1), (Vector(),2))

这是一个错误吗?如果这是所需的行为,有人可以解释为什么吗?如果昂贵的()需要很长时间,您就会明白为什么我更喜欢调用更少的结果。

这是一个带有更多示例的要点:https ://gist.github.com/underspecified/11279251

4

1 回答 1

2

您会看到这个问题,它可以采取多种不同的形式。问题本质上是map可以看到(并使用)在chunk建立结果时正在采取的中间步骤。

这种行为将来可能会改变,但同时有几个可能的解决方法。最简单的方法之一是将昂贵的函数包装在一个进程中并使用flatMap而不是map

chunked.flatMap(a =>
  Process.eval(Task.delay(expensive(a)))
).zipWithIndex.runLog.run

另一种解决方案是将昂贵的功能包装在有效的渠道中:

def expensiveChannel[A] = Process.constant((a: A) => Task.delay(expensive(a)))

现在您可以使用through

chunked.through(expensiveChannel).zipWithIndex.runLog.run

虽然当前的行为可能有点令人惊讶,但这也是一个很好的提醒,您应该使用类型系统来帮助您跟踪您关心的所有效果(长时间运行的计算可以是其中之一)。

于 2014-04-25T11:40:04.583 回答