我正在使用消息传递工具包(它恰好是Spread,但我不知道细节很重要)。从这个工具包接收消息需要一些样板文件:
- 创建与守护程序的连接。
- 加入一个小组。
- 接收一条或多条消息。
- 离开小组。
- 断开与守护程序的连接。
遵循我在其他地方看到的一些习语,我能够使用 Spread 的 Java API 和 Clojure 的互操作表单来编写一些工作函数:
(defn connect-to-daemon
"Open a connection"
[daemon-spec]
(let [connection (SpreadConnection.)
{:keys [host port user]} daemon-spec]
(doto connection
(.connect (InetAddress/getByName host) port user false false))))
(defn join-group
"Join a group on a connection"
[cxn group-name]
(doto (SpreadGroup.)
(.join cxn group-name)))
(defn with-daemon*
"Execute a function with a connection to the specified daemon"
[daemon-spec func]
(let [daemon (merge *spread-daemon* daemon-spec)
cxn (connect-to-daemon daemon-spec)]
(try
(binding [*spread-daemon* (assoc daemon :connection cxn)]
(func))
(finally
(.disconnect cxn)))))
(defn with-group*
"Execute a function while joined to a group"
[group-name func]
(let [cxn (:connection *spread-daemon*)
grp (join-group cxn group-name)]
(try
(binding [*spread-group* grp]
(func))
(finally
(.leave grp)))))
(defn receive-message
"Receive a single message. If none are available, this will block indefinitely."
[]
(let [cxn (:connection *spread-daemon*)]
(.receive cxn)))
(与 基本相同的习语with-open
,只是SpreadConnection
该类使用disconnect
而不是close
. Grr。此外,我在这里省略了一些与结构问题无关的宏。)
这工作得很好。我可以从结构内部调用接收消息,例如:
(with-daemon {:host "localhost" :port 4803}
(with-group "aGroup"
(... looping ...
(let [msg (receive-message)]
...))))
在我看来,receive-message
如果它是一个产生消息的无限惰性序列,使用起来会更干净。因此,如果我想加入一个群组并获取消息,调用代码应如下所示:
(def message-seq (messages-from {:host "localhost" :port 4803} "aGroup"))
(take 5 message-seq)
我见过很多没有清理的惰性序列的例子,这并不难。问题是上面的第 4 步和第 5 步:离开组并与守护程序断开连接。如何将连接状态和组绑定到序列中,并在不再需要序列时运行必要的清理代码?