5

我有许多(未评估的)表达式保存在一个向量中;[ expr1 expr2 expr3 ... ]

我想做的是将每个表达式交给一个单独的线程并等待一个返回值。那时我对其他线程的结果不感兴趣,想取消它们以节省 CPU 资源。

(我意识到这可能会导致不确定性,因为程序的不同运行可能会导致首先评估不同的表达式。我手头有这个。)

是否有实现上述目标的标准/惯用方式?

4

5 回答 5

5

这是我的看法。

基本上,您必须在每个期货中解决一个全局承诺,然后返回一个包含未来列表和已解决值的向量,然后取消列表中的所有期货:

(defn run-and-cancel [& expr]
    (let [p (promise)
          run-futures (fn [& expr] [(doall (map #(future (deliver p (eval %1))) expr)) @p])
          [fs res] (apply run-futures expr)]
        (map future-cancel fs)
        res))
于 2013-08-20T07:04:05.780 回答
2

它尚未正式发布,但core.async看起来它可能是一种有趣的方式来解决您的问题 - 以及其他异步问题,非常巧妙。

leiningen 的咒语core.async(目前)如下:

[org.clojure/core.async "0.1.0-SNAPSHOT"]

这里有一些代码可以创建一个函数,该函数需要一些耗时的函数,并阻塞直到其中一个返回。

(require '[clojure.core.async :refer [>!! chan alts!! thread]]))     

(defn return-first [& ops]
  (let [v (map vector ops (repeatedly chan))]
    (doseq [[op c] v]
      (thread (>!! c (op))))
    (let [[value channel] (alts!! (map second v))]
         value)))

;; Make sure the function returns what we expect with a simple Thread/sleep
(assert (= (return-first (fn [] (Thread/sleep 3000) 3000)
                         (fn [] (Thread/sleep 2000) 2000)
                         (fn [] (Thread/sleep 5000) 5000))
            2000))

在上面的示例中:

  • chan创建一个异步通道
  • >!!将值放入通道
  • thread在另一个线程中执行主体
  • alts!!接受一个通道向量,并在其中任何一个出现值时返回

还有比这更多的方法,我仍在思考它,但这里有一个演练:https ://github.com/clojure/core.async/blob/master/examples/walkthrough.clj

David Nolen 的博客上有一些很棒的帖子,如果令人难以置信的话(http://swannodette.github.io/

编辑

刚刚看到 Michał Marczyk 回答了一个非常相似的问题,但更好的是,在这里,允许您取消/短路。 使用 Clojure 处理长时间运行的进程并比较它们的返回

于 2013-08-19T15:20:19.977 回答
1

你想要的是 Java 的CompletionService。我不知道clojure中有任何包装器,但是使用互操作并不难。下面的示例大致基于ExecutorCompletionService的 JavaDoc 页面上的示例。

(defn f [col] 
    (let [cs (ExecutorCompletionService. (Executors/newCachedThreadPool))
          futures (map #(.submit cs %) col)
          result (.get (.take cs))]
        (map #(.cancel % true) futures)
        result))
于 2013-08-19T14:58:00.643 回答
0

您可以使用 future-call 来获取所有期货的列表,并将它们存储在 Atom 中。然后,用“在头部射击其他未来”功能组合每个正在运行的未来,这样第一个将终止所有剩余的未来。这是一个例子

(defn first-out [& fns]
  (let [fs (atom [])
        terminate (fn [] (println "cancling..") (doall (map future-cancel @fs)))]
    (reset! fs (doall (map (fn [x] (future-call #((x) (terminate)))) fns)))))

(defn wait-for [n s]
  (fn [] (print "start...") (flush) (Thread/sleep n) (print s) (flush)))

(first-out (wait-for 1000 "long") (wait-for 500 "short"))

编辑

刚刚注意到前面的代码没有返回第一个结果,所以它主要用于副作用。这是另一个使用承诺返回第一个结果的版本:

(defn first-out [& fns]
  (let [fs (atom [])
        ret (promise)
        terminate (fn [x] (println "cancling.." ) 
                          (doall (map future-cancel @fs))
                          (deliver ret x))]
    (reset! fs (doall (map (fn [x] (future-call #(terminate (x)))) fns)))
    @ret))

(defn wait-for [n s]
  "this time, return the value"
  (fn [] (print "start...") (flush) (Thread/sleep n) (print s) (flush) s))

(first-out (wait-for 1000 "long") (wait-for 500 "short"))
于 2013-08-19T23:59:05.780 回答
-1

虽然我不知道是否有一种惯用的方式来实现您的目标,但 Clojure Future看起来很合适。

接受一个表达式主体并产生一个未来对象,该对象将在另一个线程中调用主体,并将缓存结果并在所有后续调用 deref/@ 时返回它。如果计算尚未完成,对 deref/@ 的调用将阻塞,除非使用带超时的 deref 变体。

于 2013-08-19T13:41:06.533 回答