0

我正在使用 Scala 从我们的列存储 Cassandra 中读取列。每列包含多个条目,n,其中 n 可以在 10 到 20 之间。我们读取一批条目,即一次 1000 个,并且必须从条目创建列;每个条目都有一个附加的 ID,我们可以使用它来分组。

目前,我们使用迭代器来遍历批次中的条目,并通过比较当前 ID 和先前 ID 来确定我们是否进入了新列,并且我们读取了许多批次,直到完成。我们需要在每个批次迭代结束时存储部分列,因为该列的其余部分将在下一个批次中。我在下面放了一些伪代码来演示我们目前使用的基本算法。

怎么能以功能性的方式做到这一点?(如果 n 是常数,这将是一个简单的问题,因为我们可以适当地设置批量大小。)

伪代码:

val resultBuffer // collects all columns
val columnBuffer // collects entries for current column
var currentId    // id of current column

while(batchIterator.hasNext){
     val batch = batchIterator.getNext
     val entryIterator = batch.entries.iterator

     while(entryIterator.hasNext){
           val entry = entryIterator.next
            if(entry.id != currentId) {
               currentId = entry.id  
               resultBuffer += columnBuilder(columnBuffer)
               columnBuffer.removeAll
               columnBuffer += entry
            } else {
                columnBuffer += entry
            } 
     }
}
4

1 回答 1

1

这是一个更实用的实现草图,它使用滑动来对条目迭代器中的条目进行分组:

val resultBuffer // collects all columns

batchIterator.foreach(batch => {
  val buffer = 
    batch.entries.sliding(2).foldLeft(new ColumnBuffer){(buffer, (curr, next)) =>
      if (curr.id != next.id) {
        resultBuffer += columnBuilder(buffer :+ entry /* Append entry to buffer */)
        new ColumnBuffer
      } else
        buffer += entry /* Return buffer with entry added */
    }

  if (buffer.nonEmpty) resultBuffer += columnBuilder(buffer)
}

在这里,唯一的“全局”对象必须是可变的resultBuffer。我们甚至可以通过将它作为另一个累加器包含在内部并用另一个foldLeft替换外部来摆脱它。foreachfoldLeft

如果运行时效率对您的代码至关重要,那么您绝对应该对各种可能的实现进行基准测试,以便在功能性和性能之间找到良好的折衷。


编辑 1:修复了草图中的一个错误,即存储在中的最后一个条目序列buffer未添加到resultBuffer. 该错误已存在于 OP 的代码中。


编辑 2:(针对 Chuck 的评论)

curr会将值entries(0)设为entries(entries.size() - 2),即不会处理最后一个元素。解决此问题的一种方法是将虚拟元素附加到迭代器,例如

(batch.entries ++ List(dummy)).sliding(2).foldLeft ...

这不好,更重要的是,它在为空时不起作用,因为它会产生单个 window 。另一种解决方案是包含在内部的累加器中,并在终止后处理它。我还没有解决它,但看起来这会使解决方案更不吸引人。batch.entriessliding(2)List(dummy)nextfoldLeftfoldLeft

于 2012-11-23T09:06:43.553 回答