9

我试图弄清楚如何使用 clojure 将一个简单的操作有效地并行应用于一个大序列。我希望能够使用并行解决方案来利用我机器上的多个内核来实现一些加速。

我正在尝试将 pmap 与 partition-all 结合使用,以减少为输入序列中的每个项目创建未来的开销。不幸的是,partition-all 强制对每个分区序列进行完整评估。这会在我的机器上导致 OutOfMemoryError。

(defn sum [vs]
  (reduce + vs))

(def workers
  (+ 2 (.. Runtime getRuntime availableProcessors)))

(let
  [n 80000000
   vs (range n)]

  (time (sum vs))
  (time (sum (pmap sum (partition-all (long (/ n workers)) vs)))))

如何将 sum 应用于大型输入集,并超越串行实现的性能?

解决方案

感谢 @Arthur Ulfeldt 指出 reducers 库。这是使用减速器的解决方案。此代码显示了在多核机器上运行时预期的性能改进。(注意:我已将 vs 更改为一个函数,以使时间更准确)

(require '[clojure.core.reducers :as r])

(let
  [n 80000000
   vs #(range n)]

  (time (reduce + (vs)))
  (time (r/fold + (vs)))
4

1 回答 1

9

使用 pmap 时,我发现需要相当大的块来克服切换和未来的开销,尝试使用 10,000 的块大小来实现与+. 潜在收益受限于生成块的开销。这会产生一个平衡可用核心和制作块所需时间的最佳值。在这种情况下,+作为工作负载,我无法使其比单线程选项更快。

如果您有兴趣在没有 pmap 的情况下执行此操作并可能使用 fork/join,请查看新的(ish)reducers 库

OOM 情况来自第一个测试实现惰性序列(range n),然后保留该惰性序列,以便将其传递给第二个序列。

如果我通过定义一个函数使 + 函数变得更慢,slow+并使用单线程、块上的 pmap 和带有 forkJoin 的减速器之间的差异变得可见:

user> *clojure-version*                                                             
{:major 1, :minor 5, :incremental 0, :qualifier "RC15"}
(require '[clojure.core.reducers :as r]) 

(def workers
  (+ 2 (.. Runtime getRuntime availableProcessors)))

(defn slow+
  ([] 0)
  ([x] x)
  ([x y] (reduce + (range 100000)) (+ x y)))

(defn run-test []
  (let [n 8000]
   (time (reduce slow+ (range n)))
   (time (reduce slow+ (pmap #(reduce slow+ %) (partition-all (* workers 100) (range n)))))
   (time (r/fold slow+ (vec (range n)))))) 

user> (run-test)
"Elapsed time: 28655.951241 msecs" ; one thread
"Elapsed time: 6975.488591 msecs"  ; pmap over chunks
"Elapsed time: 8170.254426 msecs"  ; using reducer
于 2013-02-11T18:47:18.493 回答