17

我根本没有在 Clojure 中使用过多线程,所以不确定从哪里开始。

我有一个doseq身体可以并行运行。我想要的是总是有 3 个线程在运行(留下 1 个核心空闲)并行评估主体,直到范围用完。没有共享状态,没有什么复杂的——相当于 Python 的多处理就可以了。

所以像:

(dopar 3 [i (range 100)]
  ; repeated 100 times in 3 parallel threads...
  ...)

我应该从哪里开始寻找?有这个命令吗?标准包装?一个好的参考?

到目前为止,我已经找到pmap,并且可以使用它(我如何一次限制为 3 个?看起来它一次使用 32 个- 不,消息来源说 2 + 个处理器),但似乎这是一个基本的原语那应该已经存在于某个地方。

澄清:我真的很想控制线程数。我有长时间运行的进程并使用大量内存,因此创建大量进程并希望一切正常并不是一个好方法(使用大量可用内存的示例)。

更新:开始编写执行此操作的宏,我需要一个信号量(或互斥体,或我可以等待的原子)。Clojure 中是否存在信号量?或者我应该使用 ThreadPoolExecutor?不得不从 Java 中提取这么多东西似乎很奇怪——我认为 Clojure 中的并行编程应该很容易......也许我在想这个完全错误的方式?嗯。代理?

4

7 回答 7

6

好的,我我想要的是agent每个循环都有一个,数据使用send. 使用触发的代理send从线程池运行,因此数量在某种程度上受到限制(它不能精确控制三个线程,但现在必须这样做)。

[Dave Ray 在评论中解释:要控制池大小,我需要自己编写]

(defmacro dopar [seq-expr & body]
  (assert (= 2 (count seq-expr)) "single pair of forms in sequence expression")
  (let [[k v] seq-expr]
    `(apply await
       (for [k# ~v]
         (let [a# (agent k#)]
           (send a# (fn [~k] ~@body))
         a#)))))

可以像这样使用:

(deftest test-dump
  (dopar [n (range 7 11)]
    (time (do-dump-single "/tmp/single" "a" n 10000000))))

耶!作品!我好棒!(好吧,Clojure 也有点摇滚)。 相关博文

于 2012-06-10T17:18:41.533 回答
5

pmap在大多数情况下实际上可以正常工作 - 它为您的机器使用具有合理数量的线程的线程池。我不会费心尝试创建自己的机制来控制线程数,除非您有真正的基准证据表明默认值会导致问题。

话虽如此,如果您真的想限制最多三个线程,一个简单的方法是在范围的 3 个子集上使用 pmap:

(defn split-equally [num coll] 
  "Split a collection into a vector of (as close as possible) equally sized parts"
  (loop [num num 
         parts []
         coll coll
         c (count coll)]
    (if (<= num 0)
      parts
      (let [t (quot (+ c num -1) num)]
        (recur (dec num) (conj parts (take t coll)) (drop t coll) (- c t)))))) 

(defmacro dopar [thread-count [sym coll] & body]
 `(doall (pmap 
    (fn [vals#]
      (doseq [~sym vals#]
        ~@body))  
    (split-equally ~thread-count ~coll))))

请注意使用doall,这是强制评估pmap(这是惰性的)所必需的。

于 2012-06-11T03:25:38.560 回答
5

现在实际上有一个图书馆可以做到这一点。从他们的github

Claypoole 库提供了 Clojure 函数的基于线程池的并行版本,例如pmapfuturefor.

它提供相同的有序/无序版本。

于 2017-08-30T15:34:31.993 回答
4

你为什么不只使用pmap?您仍然无法控制线程池,但它比编写使用代理的自定义宏要少得多(为什么不使用期货?)。

于 2012-06-10T19:05:35.753 回答
4

我对以下要求有类似的问题:

  1. 控制使用的线程数;
  2. 不知道线程池的管理;
  3. 不需要保持任务的顺序;
  4. 任务的处理时间可以不同,因此不能保留任务的顺序,而较早完成的任务应该更早返回;
  5. 懒惰地评估和提交输入序列;
  6. 输入序列中的元素不应被越界读取,而应根据返回的结果进行缓冲和读取,以避免出现内存不足的问题。

核心pmap函数只满足最后两个假设。

这是一个满足这些假设的实现,它使用标准 Java 线程池ExecutorService以及输入流的一个CompletionService和一些分区:

(require '[clojure.tools.logging :as log])

(import [java.util.concurrent ExecutorService ExecutorCompletionService 
                              CompletionService Future])

(defn take-seq
  [^CompletionService pool]
  (lazy-seq
   (let [^Future result (.take pool)]
     (cons (.get result)
           (take-seq pool)))))

(defn qmap
  [^ExecutorService pool chunk-size f coll]
  (let [worker (ExecutorCompletionService. pool)]
    (mapcat
     (fn [chunk]
       (let [actual-size (atom 0)]
         (log/debug "Submitting payload for processing")
         (doseq [item chunk]
           (.submit worker #(f item))
           (swap! actual-size inc))
         (log/debug "Outputting completed results for" @actual-size "trades")
         (take @actual-size (take-seq worker))))
     (partition-all chunk-size coll))))

可以看出qmap并没有实例化线程池本身,而只是ExecutorCompletionService. 例如,这允许传入一个固定大小ThreadPoolExecutorService。此外,由于qmap返回一个惰性序列,它不能也不能管理线程池资源本身。最后,chunk-size允许限制输入序列的多少元素被实现并作为任务一次提交。

下面的代码演示了正确的用法:

(import [java.util.concurrent Executors])

(let [thread-pool (Executors/newFixedThreadPool 3)]
  (try
    (doseq [result (qmap thread-pool
                         ;; submit no more than 500 tasks at once
                         500 
                         long-running-resource-intensive-fn
                         unboundedly-large-lazy-input-coll)]
      (println result))
    (finally
      ;; (.shutdown) only prohibits submitting new tasks,
      ;; (.shutdownNow) will even cancel already submitted tasks.
      (.shutdownNow thread-pool))))

以下是一些使用的 Java 并发类的文档:

于 2013-10-30T16:19:14.233 回答
2

Not sure if it is idiomatic, as I'm still quite a beginner with Clojure, but the following solution works for me and it also looks pretty concise:

(let [number-of-threads 3
      await-timeout 1000]
  (doseq [p-items (partition number-of-threads items)]
    (let [agents (map agent p-items)]
      (doseq [a agents] (send-off a process))
      (apply await-for await-timeout agents)
      (map deref agents))))
于 2012-06-19T03:43:16.593 回答
0

使用管道和渠道。如果您的操作是 IO 绑定的,这是一个更可取的选项,因为 pmap 的池绑定到 CPU 数量。

另一个不错的选择是使用代理和发送,它在下面使用 cachedThredPoolExecutor。

于 2018-05-28T11:13:02.433 回答