给定一个非常大的collection.parallel.mutable.ParHashMap实例(或任何其他并行集合),一旦找到给定的(例如 50 个)匹配数,如何中止过滤并行扫描?
尝试在线程安全的“外部”数据结构中累积中间匹配项或保留具有结果计数的外部 AtomicInteger 在 4 个内核上似乎比使用常规collection.mutable.HashMap并将单个内核固定为 100慢 2 到 3 倍%。
我知道Par* 集合上的find或存在确实会“在内部”中止。有没有一种方法可以概括这一点以找到多个结果?
这里的代码在 ParHashMap 上似乎仍然慢了 2 到 3 倍,大约有 79,000 个条目,并且还存在将超过 maxResults 结果填充到结果 CHM 中的问题(这可能是由于线程在incrementAndGet之后但在break之前被抢占允许其他线程添加更多元素)。更新:似乎速度变慢是由于工作线程在 counter.incrementAndGet() 上竞争,这当然违背了整个并行扫描的目的:-(
def find(filter: Node => Boolean, maxResults: Int): Iterable[Node] =
{
val counter = new AtomicInteger(0)
val results = new ConcurrentHashMap[Key, Node](maxResults)
import util.control.Breaks._
breakable
{
for ((key, node) <- parHashMap if filter(node))
{
results.put(key, node)
val total = counter.incrementAndGet()
if (total > maxResults) break
}
}
results.values.toArray(new Array[Node](results.size))
}