我有许多(未评估的)表达式保存在一个向量中;[ expr1 expr2 expr3 ... ]
我想做的是将每个表达式交给一个单独的线程并等待一个返回值。那时我对其他线程的结果不感兴趣,想取消它们以节省 CPU 资源。
(我意识到这可能会导致不确定性,因为程序的不同运行可能会导致首先评估不同的表达式。我手头有这个。)
是否有实现上述目标的标准/惯用方式?
我有许多(未评估的)表达式保存在一个向量中;[ expr1 expr2 expr3 ... ]
我想做的是将每个表达式交给一个单独的线程并等待一个返回值。那时我对其他线程的结果不感兴趣,想取消它们以节省 CPU 资源。
(我意识到这可能会导致不确定性,因为程序的不同运行可能会导致首先评估不同的表达式。我手头有这个。)
是否有实现上述目标的标准/惯用方式?
这是我的看法。
基本上,您必须在每个期货中解决一个全局承诺,然后返回一个包含未来列表和已解决值的向量,然后取消列表中的所有期货:
(defn run-and-cancel [& expr]
(let [p (promise)
run-futures (fn [& expr] [(doall (map #(future (deliver p (eval %1))) expr)) @p])
[fs res] (apply run-futures expr)]
(map future-cancel fs)
res))
它尚未正式发布,但core.async
看起来它可能是一种有趣的方式来解决您的问题 - 以及其他异步问题,非常巧妙。
leiningen 的咒语core.async
(目前)如下:
[org.clojure/core.async "0.1.0-SNAPSHOT"]
这里有一些代码可以创建一个函数,该函数需要一些耗时的函数,并阻塞直到其中一个返回。
(require '[clojure.core.async :refer [>!! chan alts!! thread]]))
(defn return-first [& ops]
(let [v (map vector ops (repeatedly chan))]
(doseq [[op c] v]
(thread (>!! c (op))))
(let [[value channel] (alts!! (map second v))]
value)))
;; Make sure the function returns what we expect with a simple Thread/sleep
(assert (= (return-first (fn [] (Thread/sleep 3000) 3000)
(fn [] (Thread/sleep 2000) 2000)
(fn [] (Thread/sleep 5000) 5000))
2000))
在上面的示例中:
chan
创建一个异步通道>!!
将值放入通道thread
在另一个线程中执行主体alts!!
接受一个通道向量,并在其中任何一个出现值时返回还有比这更多的方法,我仍在思考它,但这里有一个演练:https ://github.com/clojure/core.async/blob/master/examples/walkthrough.clj
David Nolen 的博客上有一些很棒的帖子,如果令人难以置信的话(http://swannodette.github.io/)
编辑
刚刚看到 Michał Marczyk 回答了一个非常相似的问题,但更好的是,在这里,它允许您取消/短路。 使用 Clojure 处理长时间运行的进程并比较它们的返回
你想要的是 Java 的CompletionService。我不知道clojure中有任何包装器,但是使用互操作并不难。下面的示例大致基于ExecutorCompletionService的 JavaDoc 页面上的示例。
(defn f [col]
(let [cs (ExecutorCompletionService. (Executors/newCachedThreadPool))
futures (map #(.submit cs %) col)
result (.get (.take cs))]
(map #(.cancel % true) futures)
result))
您可以使用 future-call 来获取所有期货的列表,并将它们存储在 Atom 中。然后,用“在头部射击其他未来”功能组合每个正在运行的未来,这样第一个将终止所有剩余的未来。这是一个例子:
(defn first-out [& fns]
(let [fs (atom [])
terminate (fn [] (println "cancling..") (doall (map future-cancel @fs)))]
(reset! fs (doall (map (fn [x] (future-call #((x) (terminate)))) fns)))))
(defn wait-for [n s]
(fn [] (print "start...") (flush) (Thread/sleep n) (print s) (flush)))
(first-out (wait-for 1000 "long") (wait-for 500 "short"))
编辑
刚刚注意到前面的代码没有返回第一个结果,所以它主要用于副作用。这是另一个使用承诺返回第一个结果的版本:
(defn first-out [& fns]
(let [fs (atom [])
ret (promise)
terminate (fn [x] (println "cancling.." )
(doall (map future-cancel @fs))
(deliver ret x))]
(reset! fs (doall (map (fn [x] (future-call #(terminate (x)))) fns)))
@ret))
(defn wait-for [n s]
"this time, return the value"
(fn [] (print "start...") (flush) (Thread/sleep n) (print s) (flush) s))
(first-out (wait-for 1000 "long") (wait-for 500 "short"))
虽然我不知道是否有一种惯用的方式来实现您的目标,但 Clojure Future看起来很合适。
接受一个表达式主体并产生一个未来对象,该对象将在另一个线程中调用主体,并将缓存结果并在所有后续调用 deref/@ 时返回它。如果计算尚未完成,对 deref/@ 的调用将阻塞,除非使用带超时的 deref 变体。