1

tl;dr 为什么 Clojure 会在其中创建一个单独Writer的线程newFixedThreadPool?为什么池终止后可能会被刷新?为什么这种行为只能在 Cursive 中重现?

假设我们有一个应用程序在单独的线程中做某事,并且某事写入stdout. 假设在我们完成所有操作之后,我们想要打印一条最终消息。

我们将遇到的第一件事是 Clojure 的println,如果提供多个​​参数,将产生交错输出。这在此处进行了介绍。

但似乎还有另一个问题。如果我们运行这样的事情:

(defn main []
  (let [pool (make-pool num-threads)]
    (print-multithreaded pool "Hello, world!")
    (shutdown-pool pool))
  (safe-println "All done, have a nice day."))

我们有时会有

Hello, world!
All done, have a nice day.

而有时

All done, have a nice day.
Hello, world!

也许flush在每次写入之后?

(defn safe-println [& more]
  (.write *out* (str (clojure.string/join " " more) "\n"))
  (.flush *out*))

不工作。有效的方法是在 之上使用显式的 Java 互操作System.out,如下所示:

(defn safe-println [& more]
  (let [writer (System/out)]
    (.println writer (str (clojure.string/join " " more)))
    (.flush writer)))

制作or也可以writer(PrintWriter. System/out)(OutputStreamWriter. System/out)

似乎我们*out*的线程中有不同的 s ......确实,

(def out *out*)
(defn safe-println [& more]
  (.write out (str (clojure.string/join " " more) "\n"))
  (.flush out))

作品。

那么问题来了:为什么会发生这种情况?对于 Java 部分,这是有道理的:System.out是静态最终的,因此所有线程只存在一个实例,并且一切都与它对话,所以一切都添加到同一个缓冲区。通过打印到 Clojure 的*out*,主线程和池线程有自己的*out*,有自己的缓冲区(对于主线程,它是 a PrintWriter,对于池线程,它是 shared OutputStreamWriter)。我真的不明白为什么会这样,我也不明白为什么会导致排序不一致:我们在调用最终打印之前明确地完成了所有线程,这应该会导致隐式flush。但是即使我们添加了一个显式flush的,结果还是一样的。

我可能在这里遗漏了一些非常明显的细节,如果你能帮助我,我会很高兴。如果您想查看整个可重现的示例,由于篇幅较长,我在此处不包括在内,这里是要点的链接:https ://gist.github.com/trueneu/b8498aa259899a8fc979090fccf632de

编辑:第一个版本的 gist 确实有效,你必须修补它才能打破它,所以我编辑它以从一开始就展示“不正确”的行为。

另外,为了消除任何误解,这里是草书的截图:https ://ibb.co/jHqSL0

EDIT2:这是在原始问题中指出的,但我会强调一下。了解这种行为的要点和机制是问题的一半。*out*不会为每个线程创建新的。但它似乎为线程池创建了一个单独的线程池。(对于这个输出,减少到 1,并添加tonum-threads的打印。增加不会产生新的对象地址):(.toString *out*)safe-printlnnum-threads

(main)
java.io.PrintWriter@1dcc77c6
All done, have a nice day.
=> nil
java.io.OutputStreamWriter@7104a76f
Hello, world!

EDIT3:在@glts 评论后map更改。doseq此外,当从 运行时lein repl,它总是产生正确的输出,这让我更加困惑。因此,正如 David Arenas 所指出的,行为似乎取决于上游输出处理。然而,问题仍然存在。

EDIT4:大卫阿里纳斯也在苹果酒中检查了这一点,并且无法重现该行为。似乎它与 Cursive 的 nrepl 输出处理实现有关。

4

1 回答 1

1

Clojure*out*不会为每个线程创建一个实例(它也是静态最终的),但它确实使用了没有原子保证的 OutputStreamWriter。由于您正在写入单个流,因此您需要同步缓冲区上的线程。

如果你使用 nrepl 运行你的代码,你会看到你得到了“正确”的行为。这是因为它们重新绑定使用锁定缓冲区的自己的编写器。

nrepl 的会话:

(defn- session-out
  "Returns a PrintWriter suitable for binding as *out* or *err*.  All of
   the content written to that PrintWriter will (when .flush-ed) be sent on the
   given transport in messages specifying the given session-id.
   `channel-type` should be :out or :err, as appropriate."
  [channel-type session-id transport]
  (let [buf (clojure.tools.nrepl.StdOutBuffer.)]
    (PrintWriter. (proxy [Writer] []
                    (close [] (.flush ^Writer this))
                    (write [& [x ^Integer off ^Integer len]]
                      (locking buf
                        (cond
                          (number? x) (.append buf (char x))
                          (not off) (.append buf x)
                          ; the CharSequence overload of append takes an *end* idx, not length!
                          (instance? CharSequence x) (.append buf ^CharSequence x (int off) (int (+ len off)))
                          :else (.append buf ^chars x off len))
                        (when (<= *out-limit* (.length buf))
                          (.flush ^Writer this))))
                    (flush []
                      (let [text (locking buf (let [text (str buf)]
                                                (.setLength buf 0)
                                                text))]
                        (when (pos? (count text))
                          (t/send (or (:transport *msg*) transport)
                                  (response-for *msg* :session session-id
                                                channel-type text))))))
                  true)))
于 2018-11-05T22:16:52.417 回答