10
(require '[clojure.core.reducers :as r])

(def data (into [] (take 10000000 (repeatedly #(rand-int 1000)))))

(defn frequencies [coll]
  (reduce (fn [counts x]
    (merge-with + counts {x 1}))
    {} coll))

(defn pfrequencies [coll]
  (r/reduce (fn [counts x]
    (merge-with + counts {x 1}))
    {} coll))


user=> (time (do (frequencies data) nil))
"Elapsed time: 29697.183 msecs"

user=> (time (do (pfrequencies data) nil))
"Elapsed time: 25273.794 msecs"

user=> (time (do (frequencies data) nil))
"Elapsed time: 25384.086 msecs"

user=> (time (do (pfrequencies data) nil))
"Elapsed time: 25778.502 msecs"

And who can show me an example with significant speedup?

I'm running on Mac OSX 10.7.5 with Java 1.7 on an Intel Core i7 (2 cores, http://ark.intel.com/products/54617).

4

3 回答 3

19

您将其称为它pfrequencies,连同您parallel-processing在问题上的标签,表明您认为某些东西在这里使用了多个线程。事实并非如此,它也不是 reducers 库的“主要”目标。

reducer 给你的主要好处是你不需要为你的惰性序列分配许多中间 cons 单元。在引入 reducers 之前,frequencies将分配 10000000 个 cons 单元来创建reduce要使用的向量的顺序视图。既然存在 reducer,vector 就知道如何在不创建此类临时对象的情况下自行减少。但是该功能已被反向移植到clojure.core/reduce中,其行为完全一样r/reduce(忽略此处不相关的一些次要功能)。因此,您只是将您的函数与自身的相同克隆进行基准测试。

reducers 库还包含 a 的概念fold,它可以并行执行一些工作,然后将中间结果合并在一起。要使用它,您需要提供比reduce需要更多的信息:您必须定义如何从无到有开始一个“块”;你的函数必须是关联的;并且您必须指定如何组合块。A. Webb 的回答演示了如何fold正确使用,在多个线程上完成工作。

但是,您不太可能从折叠中获得任何好处:除了他指出的原因(与 相比,您放弃瞬态clojure.core/frequencies)之外,构建地图并不容易并行化。如果其中的大部分工作frequencies是添加的(就像在类似的东西中一样(frequencies (repeat 1e6 1))),那么fold会有所帮助;但大部分工作是管理哈希图中的键,最终必须是单线程的。您可以并行构建地图,但必须将它们合并在一起;由于该组合步骤所花费的时间与块的大小成正比,而不是恒定时间,因此无论如何通过在单独的线程上执行块来获得很少。

于 2013-05-20T18:16:35.647 回答
5

您的频率函数的一个fold版本看起来像

(defn pfrequencies [coll] 
  (r/fold 
    (fn combinef
      ([] {})
      ([x y] (merge-with + x y)))
    (fn reducef
      ([counts x] (merge-with + counts {x 1})))
    coll))

在 2 个内核上,它可能会比clojure.core/frequencies使用瞬变的要慢得多。至少在 4 个内核上,它比第一个实现快(2 倍),但仍然比clojure.core/frequencies.

你也可以尝试

(defn p2frequencies [coll]
  (apply merge-with + (pmap clojure.core/frequencies (partition-all 512 coll))))
于 2013-05-20T18:04:31.833 回答
4

这里的答案中有一些值得深思的地方。在这种特定情况下,不需要映射,因为可以轻松预测结果域并将其放入可以使用索引的向量中。因此,一个幼稚问题的幼稚实现将类似于:

(defn freqs
  [coll]
  (reduce (fn [counts x] (assoc counts x (inc (get counts x))))
          (vec (int-array 1000 0))
          coll))

(defn rfreqs
     [coll]
     (r/fold
       (fn combinef
         ([] (vec (int-array 1000 0)))
         ([& cols] (apply mapv + cols)))
       (fn reducef
         [counts x] (assoc counts x (inc (get counts x))))
       coll))

在这里,combinef 将是对结果集合的 1000 列的简单映射添加,应该可以忽略不计。

这使 reducer 版本的速度比普通版本快 2-3 倍,尤其是在更大的(10x-100x)数据集上。对 r/fold 的分区大小(可选的 'n' 参数)进行一些调整可以作为微调来完成。似乎最适合使用 (* 16 1024) 和 1E8 的数据大小(至少需要 6GB JVM)。

您甚至可以在两个版本中使用瞬变,但我没有注意到太多改进。

我知道这个版本不适合一般用途,但它可能会在没有哈希管理开销的情况下显示速度提高。

于 2013-05-21T15:13:59.420 回答