1

clojure 中实现以下操作的最佳方法是什么?

  • 获取发送的消息/数据
  • 对该数据执行某些功能以获得新状态;就像是(fn [state new-msgs] ...)
  • 在该更新期间继续接收消息/数据
  • 完成该更新后,对在此期间发送的所有消息运行相同的更新功能

代理似乎不太合适。必须同时send向代理程序运行和数据,这不会为对上次更新期间传入的所有数据进行操作的函数留出空间。该目标隐含地要求将功能和数据解耦。

Actor 模型似乎更适合于函数和数据的解耦。但是,我所知道的所有参与者框架似乎都假设发送的每条消息都将单独处理。目前尚不清楚如何在不增加额外机器的情况下将其转变为头脑。我知道Pulsar 的演员接受一个:lifecycle-handle可以用来让演员做“特殊技巧”的功能,但是没有很多关于这个的文档,所以不清楚这个功能是否有用。

确实有使用代理、core.async通道和watch函数来解决这个问题,但它有点乱,我希望有更好的解决方案。如果其他人发现它有帮助,我会将其作为解决方案发布,但我想看看其他人提出了什么。

4

3 回答 3

1

代理与您在这里想要的相反 - 它们是一个被发送更新函数的值。这最容易使用队列和线程。为方便起见,我future用来构造线程。

user> (def q (java.util.concurrent.LinkedBlockingDeque.)) 
#'user/q
user> (defn accumulate
        [summary input]
        (let [{vowels true consonents false}
              (group-by #(contains? (set "aeiouAEIOU") %) input)]
          (-> summary
            (update-in [:vowels] + (count vowels))
            (update-in [:consonents] + (count consonents)))))
#'user/accumulate
user> (def worker
           (future (loop [summary {:vowels 0 :consonents 0} in-string (.take q)]
                         (if (not in-string)
                             summary
                           (recur (accumulate summary in-string)
                                  (.take q))))))
#'user/worker
user> (.add q "hello")
true
user> (.add q "goodbye")
true
user> (.add q false)
true
user> @worker
{:vowels 5, :consonents 7}
于 2014-10-15T21:02:48.853 回答
1

这是我使用代理、core.async 通道和监视函数提出的解决方案。同样,它有点乱,但它现在可以满足我的需要。概括地说,就是这样:

(require '[clojure.core.async :as async :refer [>!! <!! >! <! chan go]])

; We'll call this thing a queued-agent
(defprotocol IQueuedAgent
  (enqueue [this message])
  (ping [this]))

(defrecord QueuedAgent [agent queue]
  IQueuedAgent
  (enqueue [_ message]
    (go (>! queue message)))
  (ping [_]
    (send agent identity)))


; Need a function for draining a core async channel of all messages
(defn drain! [c]
  (let [cc (chan 1)]
    (go (>! cc ::queue-empty))
    (letfn
      ; This fn does all the hard work, but closes over cc to avoid reconstruction
      [(drainer! [c]
         (let [[v _] (<!! (go (async/alts! [c cc] :priority true)))]
           (if (= v ::queue-empty)
             (lazy-seq [])
             (lazy-seq (cons v (drainer! c))))))]
      (drainer! c))))

; Constructor function
(defn queued-agent [& {:keys [buffer update-fn init-fn error-handler-builder] :or {:buffer 100}}]
  (let [q                (chan buffer)
        a                (agent (if init-fn (init-fn) {}))
        error-handler-fn (error-handler-builder q a)]
    ; Set up the queue, and watcher which runs the update function when there is new data
    (add-watch
      a
      :update-conv
      (fn [k r o n]
        (let [queued (drain! q)]
          (when-not (empty? queued)
            (send a update-fn queued error-handler-fn)))))
    (QueuedAgent. a q)))

; Now we can use these like this

(def a (queued-agent
         :init-fn   (fn [] {:some "initial value"})
         :update-fn (fn [a queued-data error-handler-fn]
                      (println "Receiving data" queued-data)
                      ; Simulate some work/load on data
                      (Thread/sleep 2000)
                      (println "Done with work; ready to queue more up!"))
         ; This is a little warty at the moment, but closing over the queue and agent lets you requeue work on
         ; failure so you can try again.
         :error-handler-builder
                    (fn [q a] (println "do something with errors"))))

(defn -main []
  (doseq [i (range 10)]
    (enqueue a (str "data" i))
    (Thread/sleep 500) ; simulate things happening
    ; This part stinks... have to manually let the queued agent know that we've queued some things up for it
    (ping a)))

您会注意到,每次添加新数据时都必须在此处 ping queued-agent 非常麻烦。绝对感觉事情被扭曲了典型的用法。

于 2014-10-15T00:51:28.290 回答
0

我想出了一些更接近演员的东西,灵感来自蒂姆·鲍德里奇的演员阵容(第 16 集)。我认为这更干净地解决了这个问题。

(defmacro take-all! [c]
  `(loop [acc# []]
     (let [[v# ~c] (alts! [~c] :default nil)]
       (if (not= ~c :default)
         (recur (conj acc# v#))
         acc#))))


(defn eager-actor [f]
  (let [msgbox (chan 1024)]
    (go (loop [f f]
          (let [first-msg (<! msgbox) ; do this so we park efficiently, and only
                                      ; run when there are actually messages
                msgs      (take-all! msgbox)
                msgs      (concat [first-msg] msgs)]
            (recur (f msgs)))))
    msgbox))


(let [a (eager-actor (fn f [ms]
                       (Thread/sleep 1000) ; simulate work
                       (println "doing something with" ms)
                       f))]
  (doseq [i (range 20)]
    (Thread/sleep 300)
    (put! a i)))
;; =>
;; doing something with (0)
;; doing something with (1 2 3)
;; doing something with (4 5 6)
;; doing something with (7 8 9 10)
;; doing something with (11 12 13)
于 2014-11-18T08:35:08.903 回答