我正在使用 Scala 从我们的列存储 Cassandra 中读取列。每列包含多个条目,n,其中 n 可以在 10 到 20 之间。我们读取一批条目,即一次 1000 个,并且必须从条目创建列;每个条目都有一个附加的 ID,我们可以使用它来分组。
目前,我们使用迭代器来遍历批次中的条目,并通过比较当前 ID 和先前 ID 来确定我们是否进入了新列,并且我们读取了许多批次,直到完成。我们需要在每个批次迭代结束时存储部分列,因为该列的其余部分将在下一个批次中。我在下面放了一些伪代码来演示我们目前使用的基本算法。
怎么能以功能性的方式做到这一点?(如果 n 是常数,这将是一个简单的问题,因为我们可以适当地设置批量大小。)
伪代码:
val resultBuffer // collects all columns
val columnBuffer // collects entries for current column
var currentId // id of current column
while(batchIterator.hasNext){
val batch = batchIterator.getNext
val entryIterator = batch.entries.iterator
while(entryIterator.hasNext){
val entry = entryIterator.next
if(entry.id != currentId) {
currentId = entry.id
resultBuffer += columnBuilder(columnBuffer)
columnBuffer.removeAll
columnBuffer += entry
} else {
columnBuffer += entry
}
}
}