6

我对需要处理的两组非常大的数据有两个不同的函数,最后归结为两个布尔值。然后需要将这些值组合在一起以获得最终结果。我的问题是创建线程的最佳方法是什么,以便两个长函数可以同时运行。我的想法是这样的,

(def f (future longProcessOne(data_one)))
(def g (future longProcessTwo(data_two)))
(and @f @g)

但我正在寻找更好的方法来解决这个问题。

4

2 回答 2

5

您的方法是相当正常的 Clojure 代码。另一种选择是使用 Promise,或者如果您需要更复杂的处理,您可以考虑使用lamina之类的东西,或者如果您想生活在最前沿,您可以尝试 core.async

(ns async-example.core
  (:require [clojure.core.async :refer :all])

(defn example []
  (let [a (chan)  ; a channel for a to report it's answer
        b (chan)  ; a channel for b to report it's answer
        output (chan)] ; a channel for the reporter to report back to the repl
    (go (<! (timeout (rand-int 1000))) ; process a
        (>! a (rand-nth [true false])))
    (go (<! (timeout (rand-int 1000))) ; process b
        (>! b (rand-nth [true false])))
    (go (>! output (and (<! a) (<! b)))) ; the reporter process
    output)) ;return the channe that the result will be sent to

async-example.core> (<!! (go (<! (example))))
false
async-example.core> (<!! (go (<! (example))))
false
async-example.core> (<!! (go (<! (example))))
true

当然,这对于您的情况来说是多余的,尽管无论如何它非常有趣;-)

于 2013-07-12T21:05:30.113 回答
3

(基于 Promise 的方法在顶部,基于 core.async 的方法在下方。两者都在第一个虚假值上短路。)


这是一个版本,它利用了单个 promise 可以多次交付这一事实(尽管只有第一次交付会成功设置其值;后续交付只会返回nil而没有副作用)。

(defn thread-and
  "Computes logical conjunction of return values of fs, each of which
  is called in a future. Short-circuits (cancelling the remaining
  futures) on first falsey value."
  [& fs]
  (let [done (promise)
        ret  (atom true)
        fps  (promise)]
    (deliver fps (doall (for [f fs]
                          (let [p (promise)]
                            [(future
                               (if-not (swap! ret #(and %1 %2) (f))
                                 (deliver done true))
                               (locking fps
                                 (deliver p true)
                                 (when (every? realized? (map peek @fps))
                                   (deliver done true))))
                             p]))))
    @done
    (doseq [[fut] @fps]
      (future-cancel fut))
    @ret))

一些测试:

(thread-and (constantly true) (constantly true))
;;= true

(thread-and (constantly true) (constantly false))
;;= false

(every? false?
        (repeatedly 100000
                    #(thread-and (constantly true) (constantly false))))
;;= true

;; prints :foo, but not :bar
(thread-and #(do (Thread/sleep 1000) (println :foo))
            #(do (Thread/sleep 3000) (println :bar)))

将 Arthur 和 A. Webb 的想法放在一起,您可以将 core.async 与结果一起使用,同时短路返回的第一个错误值:

(defn thread-and
  "Call each of the fs on a separate thread. Return logical
  conjunction of the results. Short-circuit (and cancel the calls
  to remaining fs) on first falsey value returned."
  [& fs]
  (let [futs-and-cs
        (doall (for [f fs]
                 (let [c (chan)]
                   [(future (>!! c (f))) c])))]
    (loop [futs-and-cs futs-and-cs]
      (if (seq futs-and-cs)
        (let [[result c] (alts!! (map peek futs-and-cs))]
          (if result
            (recur (remove #(identical? (peek %) c)
                           futs-and-cs))
            (do (doseq [fut (map first futs-and-cs)]
                  (future-cancel fut))
                false)))
        true))))

(constantly false)和测试(constantly true)

(thread-and (constantly true) (constantly true))
;= true
(thread-and (constantly true) (constantly false))
;= false

;;; etc.

另请注意,短路确实有效:

;;; prints :foo before returning false
(thread-and #(do (Thread/sleep 3000) false)
            #(do (Thread/sleep 1000) (println :foo)))

;;; does not print :foo
(thread-and #(do (Thread/sleep 3000) false)
            #(do (Thread/sleep 7000) (println :foo)))
于 2013-07-12T21:45:29.237 回答