0

我不确定我的问题的正确语言,所以请随时为我提供正确的术语。

假设我有一个进程 A,它输出一个迭代器(惰性求值)这会产生 Iterator[A]

然后我有另一个进程 B,它映射返回 Iterator[B] 的事件

这将持续几个进程 Iterator[A] -> Iterator[B] -> Iterator[C] -> ---

现在最终我将此流评估为列表[Z]。这为我节省了使用 List[A] -> List[B] -> List[C] 等的内存命中

现在我想通过引入并行化来提高性能,但我不想并行化跨迭代器的每个元素的评估,而是并行化每个迭代器堆栈。因此,在这种情况下,进程 A 的线程为 Iterator[A] 填充 Queue[A],进程 B 的线程从 Queue[A] 获取,应用任何映射,然后添加到 Iterator[B] 的 Queue[B]从中读取。

现在我以前通过设计自己的异步队列在其他语言中做到了这一点,我想知道 Scala 必须解决这个问题。

4

2 回答 2

1

这是我使用演员制作的第一个刺伤解决方案。它完全阻塞,所以也许可以开发一个使用期货的实现

case class AsyncIterator[T](iterator:Iterator[T]) extends Iterator[T] {
  private val queue = new scala.collection.mutable.SynchronizedQueue[Int]()
  private var end = !iterator.hasNext

  def hasNext() = {
    if (end) false
    else if (!queue.isEmpty) true
    else hasNext
  }

  def next() = {
    while (q.isEmpty) {
      if (end) throw new Exception("blah")
    }
    q.dequeue()
  }

  private val producer: Actor = actor {
    loop {
      if (!iterator.hasNext) {
        end = true
        exit
      }
      else {
        q.enqueue(iterator.next)
      }
    }
  }
  producer.start()
}
于 2012-12-11T18:34:31.867 回答
-3

既然你对其他语言持开放态度,那么 Go 怎么样?

最近有一个关于如何构建事件驱动管道的讨论,这将实现与您描述的相同但以完全不同的方式。

可以说,考虑和设计事件管道比推理惰性迭代器更容易,因为它变成了一个数据流系统,其中每个阶段的关键问题是“这个阶段对单个实体有什么作用?” 而不是“我怎样才能有效地迭代许多实体?”

一旦实现了事件驱动的管道,如何使其并发或并行的问题就没有实际意义了——您已经完成了。

于 2012-12-12T13:42:45.467 回答