0

我正在编写一个应用程序服务器,并且有一个消息发送循环。消息由字段组成,因此可以被视为遍历字段的迭代器。并且有一个消息队列由消息循环处理,但循环在任何时候都是可中断的(例如,当套接字缓冲区已满时),以后可以恢复。当前的实现如下所示:

private val messageQueue: Queue[Iterator[Field]]

sent = 0
breakable {
  for (iterator <- messageQueue) {
    for (field <- iterator) {
      ... breakable ...
    }
    sent += 1
  }
} finally messageQueue.trimStart(sent)

这可行而且还不错,但后来我想如果我可以用使用 ++ 运算符连接迭代器的迭代器替换队列,我可以使代码更简洁。说:

private val messageQueue: Iterator[Field] = message1.iterator ++ message2.iterator ++ ...

breakable {
  for (field <- messageQueue) {
    ... breakable ...
  }
}

现在代码看起来更干净了,但存在性能问题。连接的迭代器在内部形成一个(不平衡的)树,因此 next() 操作需要 O(n) 的时间。所以迭代需要 O(n^2) 的时间。

总而言之,消息只需要处理一次,因此队列不需要是 Traversable。一个迭代器(TraversableOnce)就可以了。我想将消息队列视为连续迭代器的集合,但 ++ 存在性能问题。是否有一个很好的解决方案可以使代码更清晰但同时又高效?

4

2 回答 2

2

您是否考虑过使用Stream#:::懒惰地将您的消息连接在一起?

private val messageQueue: Stream[Field] = message1.toStream #::: message2.toStream #::: ...

breakable {
  for (field <- messageQueue) {
    ... breakable ...
  }
}

至于这里的时间复杂度,我相信您要连接的迭代器数量为 O(n) (您需要调用toStream每个迭代器并将#:::它们一起调用)。但是,个人toStream#:::操作应该是 O(1),因为它们是懒惰的。以下是 的toStream实现Iterator

def toStream: Stream[A] =
    if (self.hasNext) Stream.cons(self.next, self.toStream)
    else Stream.empty[A]

这将花费恒定的时间,因为 Stream.cons 的第二个参数是按名称调用的,因此在您实际访问尾部之前不会对其进行评估。

但是,转换为 Stream将为每个元素访问添加一个恒定的开销因素,即,不仅调用next迭代器,还必须执行一些额外的方法调用来强制流的惰性尾部并访问包含的值。

于 2013-02-28T04:11:14.410 回答
2

如果你只是把它们弄平怎么办?

def flattenIterator[T](l: List[Iterator[T]]): Iterator[T] = l.iterator.flatten
于 2013-02-28T04:14:45.417 回答