2

我正在使用消息传递工具包(它恰好是Spread,但我不知道细节很重要)。从这个工具包接收消息需要一些样板文件:

  1. 创建与守护程序的连接。
  2. 加入一个小组。
  3. 接收一条或多条消息。
  4. 离开小组。
  5. 断开与守护程序的连接。

遵循我在其他地方看到的一些习语,我能够使用 Spread 的 Java API 和 Clojure 的互操作表单来编写一些工作函数:

(defn connect-to-daemon
  "Open a connection"
  [daemon-spec]
  (let [connection (SpreadConnection.)
        {:keys [host port user]} daemon-spec]
    (doto connection
      (.connect (InetAddress/getByName host) port user false false))))

(defn join-group
  "Join a group on a connection"
  [cxn group-name]
  (doto (SpreadGroup.)
    (.join cxn group-name)))

(defn with-daemon*
  "Execute a function with a connection to the specified daemon"
  [daemon-spec func]
  (let [daemon (merge *spread-daemon* daemon-spec)
        cxn (connect-to-daemon daemon-spec)]
    (try
     (binding [*spread-daemon* (assoc daemon :connection cxn)]
       (func))
     (finally
      (.disconnect cxn)))))

(defn with-group*
  "Execute a function while joined to a group"
  [group-name func]
  (let [cxn (:connection *spread-daemon*)
        grp (join-group cxn group-name)]
    (try
     (binding [*spread-group* grp]
       (func))
     (finally
      (.leave grp)))))

(defn receive-message
  "Receive a single message. If none are available, this will block indefinitely."
  []
  (let [cxn (:connection *spread-daemon*)]
    (.receive cxn)))

(与 基本相同的习语with-open,只是SpreadConnection该类使用disconnect而不是close. Grr。此外,我在这里省略了一些与结构问题无关的宏。)

这工作得很好。我可以从结构内部调用接收消息,例如:

(with-daemon {:host "localhost" :port 4803}
  (with-group "aGroup"
    (... looping ...
      (let [msg (receive-message)] 
        ...))))

在我看来,receive-message如果它是一个产生消息的无限惰性序列,使用起来会更干净。因此,如果我想加入一个群组并获取消息,调用代码应如下所示:

(def message-seq (messages-from {:host "localhost" :port 4803} "aGroup"))
(take 5 message-seq)

我见过很多没有清理的惰性序列的例子,这并不难。问题是上面的第 4 步和第 5 步:离开组并与守护程序断开连接。如何将连接状态和组绑定到序列中,并在不再需要序列时运行必要的清理代码?

4

2 回答 2

6

本文描述了如何使用 clojure-contrib fill-queue 来做到这一点。关于清理 - 关于填充队列的巧妙之处在于,您可以提供一个阻塞函数,如果出现错误或达到某些条件,它会自行清理。您还可以持有对资源的引用以在外部控制它。该序列将终止。因此,根据您的语义要求,您必须选择适合的策略。

于 2009-10-27T02:48:05.360 回答
3

试试这个:

(ns your-namespace
  (:use clojure.contrib.seq-utils))

(defn messages-from [daemon-spec group-name]
  (let [cnx (connect-to-deamon daemon-spec))
        group (connect-to-group cnx group-name)]
    (fill-queue (fn [fill]
                  (if done? 
                      (do
                        (.leave group)
                        (.disconnect cnx)
                        (throw (RuntimeException. "Finished messages"))
                      (fill (.receive cnx))))))

设置好了吗?当您想要终止列表时为 true。此外,在 (.receive cnx) 中引发的任何异常也将终止列表。

于 2009-10-28T13:48:19.113 回答