7

基于core.async 遍历示例,我在下面创建了类似的代码,以使用多个通道处理一些 CPU 密集型作业,超时时间为 10 秒。但是在主线程返回后,CPU 使用率保持在 700% 左右(8 个 CPU 机器)。我必须在 emacs 中手动运行 nrepl-close 才能关闭 Java 进程。

是否有任何适当的方法可以杀死 (go..) 块产生的宏线程?我试过接近!每个chan,但它不起作用。我想确保主线程返回后Java进程的CPU使用率回到0。

(defn [] RETURNED-STR-FROM-SOME-CPU-INTENSE-JOB (do...   (str ...)))


(let [n 1000
      cs (repeatedly n chan)]
  (doseq [c cs] 
    (go 
     (>! c  (RETURNED-STR-FROM-SOME-CPU-INTENSE-JOB ))))

  (dotimes [i n]
    (let [[result source] (alts!!  (conj cs (timeout 10000))) ]  ;;wait for 10 seconds for each job
      (if  (list-contains? cs source)  ;;if returned chan belongs to cs 
        (prn "OK JOB FINISHED " result)
        (prn "JOB TIMEOUT")
        )))

 (doseq [i cs]
   (close! i))  ;;not useful for "killing" macro thread

 (prn "JOBS ARE DONE"))

;;Btw list-contains? function is used to judge whether an element is in a list
;;http://stackoverflow.com/questions/3249334/test-whether-a-list-contains-a-specific-value-in-clojure
(defn list-contains? [coll value]
  (let [s (seq coll)]
    (if s
      (if (= (first s) value) true (recur (rest s) value))
      false)))
4

3 回答 3

2

在 REPL 中似乎还没有干净的方法。

我首先通过使用不推荐使用的方法 Thread.stop 尝试了一种非常肮脏的方式

 (doseq [i @threadpool ]
              (.stop i))

一旦主线程返回到 REPL,CPU 使用率似乎下降了,但如果我在 REPL 中再次运行程序,它只会挂在 go 块部分!

然后我四处搜索,发现了这个博客,上面写着

最后要注意的一件事:我们没有明确地做任何工作来关闭 go 例程。当 main 函数退出时,Go 例程将自动停止运行。因此,go 例程就像 JVM 中的守护线程(嗯,除了“线程”部分......)

所以我再次尝试将我的项目制作成一个 uberjar 并在命令控制台上运行它,结果发现当闪烁的光标返回控制台时 CPU 使用率会立即下降!

于 2013-09-10T02:01:01.160 回答
1

根据对另一个相关问题How to control number of threads in (go...) 的回答,我找到了一种更好的方法来正确杀死由 (go...) 块启动的所有线程:

首先更改 executor var 并提供自定义线程池

;; def, not defonce, so that the executor can be re-defined
;; Number of threads are fixed to be 4
(def my-executor
  (java.util.concurrent.Executors/newFixedThreadPool
   4
   (conc/counted-thread-factory "my-async-dispatch-%d" true)))

(alter-var-root #'clojure.core.async.impl.dispatch/executor
                (constantly (delay (tp/thread-pool-executor my-executor))))

然后在 (go...) 块的末尾调用 .shutdownNow 和 .awaitTermination 方法

(.shutdownNow my-executor)
(while (not  (.awaitTermination  my-executor 10 java.util.concurrent.TimeUnit/SECONDS ) )
       (prn "...waiting 10 secs for executor pool to finish") )

[UPDATE] 上面的shutdown executor方法好像不够纯。我的情况的最终解决方案是使用thunk-timeout函数将一个控制自己超时的函数发送到 go 块中。学分转到这篇文章。下面的例子

(defn toSendToGo [args timeoutUnits]
  (let [result (atom nil)  
        timeout? (atom false)]
    (try
      ( thunk-timeout
        (fn []  (reset! result  (myFunction args))) timeoutUnits)
      (catch  java.util.concurrent.TimeoutException e  (do  (prn "!Time out after " timeoutUnits " seconds!!") (reset! timeout? true))     ))

    (if @timeout?  (do sth))
    @result))


(let [c ( chan)]
  (go (>! c (toSendToGo args timeoutUnits))))
于 2013-09-25T04:42:37.407 回答
1
(shutdown-agents)

特定于实现的 JVM:代理和通道都使用全局线程池,代理的终止函数会迭代并关闭 VM 中所有打开的线程。首先清空通道:此操作是即时且不可逆的(特别是如果您在 REPL 中)。

于 2016-04-07T19:00:08.673 回答