6

场景:我有一台服务器正在监听六个活动的 TCP/IP 连接。当“就绪”消息进入时,将在其自己的线程上引发一个事件。当服务器收到来自每个连接的“就绪”消息时,它需要运行“启动”功能。

我的面向对象解决方案可能涉及使用互斥锁和计数器。就像是:

int _countDown= 6;
object _lock;
void ReadyMessageReceivedForTheFirstTimeFromAConnection() {
    lock(_lock) {
      --_countDown; // 
       if (_countDown==0) Start();
    }
}

在不使用锁/互斥锁的情况下,如何在 Clojure 中解决这个问题?

4

3 回答 3

6

为此,您可以使用 CountDownLatch 或 Phaser。

在我的期货库中,迫在眉睫,我同时使用了两者。首先CountDownLatch,然后用Phaser替换它以实现 ForkJoin 兼容性(在您的情况下可能不需要)。你可以看到这个 diff的变化。希望它能让您了解两者的用法。

使用闩锁,一般的想法是:

(let [latch (CountDownLatch. 6)]
  (on-receive-message this (fn [_] (.countDown latch)))
  (.await latch)

...或类似的东西。

于 2016-02-11T23:24:33.483 回答
6

当您更喜欢纯 clojure 版本时,您可以使用 promise 来尝试一下。

每次收到消息时,都会增加 conn-count,手表会检查是否达到阈值并传递 :go to the barrier promise。

(def wait-barrier (promise))
(def conn-count (atom 0))

(add-watch conn-count :barrier-watch
           (fn [key ref old-state new-state]
             (when (== new-state 6)
               (deliver wait-barrier :go))))  

虚拟示例:

(def wait-barrier (promise))
(def conn-count (atom 0))
(defn worker-dummy []
  (when (= @wait-barrier :go)
    (println "I'm a worker")))

(defn dummy-receive-msg []
  (doall (repeatedly 6,
                     (fn []
                       (println "received msg")
                       (swap! conn-count inc)))))

(let [workers (doall (repeatedly 6 (fn [] (future (worker-dummy)))))]
  (add-watch conn-count :barrier-watch
             (fn [key ref old-state new-state]
               (when (== new-state 6)
                 (deliver wait-barrier :go))))
  (dummy-receive-msg)
  (doall (map deref workers)))
于 2016-02-12T00:44:52.103 回答
2

由于到目前为止尚未提及:您可以使用core.async轻松做到这一点。看看这个MCVE

(let [conn-count 6
      ready-chan (chan)]

  ;; Spawn a thread for each connection.
  (doseq [conn-id (range conn-count)]
    (thread
      (Thread/sleep (rand-int 2000))
      (>!! ready-chan conn-id)))

  ;; Block until all connections are established.
  (doseq [total (range 1 (inc conn-count))]
    (println (<!! ready-chan) "connected," total "overall"))

  ;; Invoke start afterwards.
  (println "start"))
;; 5 connected, 1 overall
;; 3 connected, 2 overall
;; 4 connected, 3 overall
;; 0 connected, 4 overall
;; 1 connected, 5 overall
;; 2 connected, 6 overall
;; start
;;=> nil

您还可以使用通道来实现倒计时锁存器(借用自Christophe Grand):

(defn count-down-latch-chan [n]
  (chan 1 (comp (drop (dec n)) (take 1))))

有关 core.async 的简短介绍,请查看此 Gist。如需更长的内容,请阅读“Clojure for the Brave and True”中的相应章节

于 2017-01-10T09:57:22.657 回答