我最终推出了自己的小型库,我称之为simple-queue。你可以在 GitHub 上阅读完整的文档,但这里是完整的源代码。我不会更新这个答案,所以如果你想使用这个库,请从 GitHub 获取源代码。
(ns com.github.bdesham.simple-queue)
(defn new-queue
"Creates a new queue. Each trigger from the timer will cause the function f
to be invoked with the next item from the queue. The queue begins processing
immediately, which in practice means that the first item to be added to the
queue is processed immediately."
[f & opts]
(let [options (into {:delaytime 1}
(select-keys (apply hash-map opts) [:delaytime])),
delaytime (:delaytime options),
queue {:queue (java.util.concurrent.LinkedBlockingDeque.)},
task (proxy [java.util.TimerTask] []
(run []
(let [item (.takeFirst (:queue queue)),
value (:value item),
prom (:promise item)]
(if prom
(deliver prom (f value))
(f value))))),
timer (java.util.Timer.)]
(.schedule timer task 0 (int (* 1000 delaytime)))
(assoc queue :timer timer)))
(defn cancel
"Permanently stops execution of the queue. If a task is already executing
then it proceeds unharmed."
[queue]
(.cancel (:timer queue)))
(defn process
"Adds an item to the queue, blocking until it has been processed. Returns
(f item)."
[queue item]
(let [prom (promise)]
(.offerLast (:queue queue)
{:value item,
:promise prom})
@prom))
(defn add
"Adds an item to the queue and returns immediately. The value of (f item) is
discarded, so presumably f has side effects if you're using this."
[queue item]
(.offerLast (:queue queue)
{:value item,
:promise nil}))
使用此队列返回值的示例:
(def url-queue (q/new-queue slurp :delaytime 30))
(def github (q/process url-queue "https://github.com"))
(def google (q/process url-queue "http://www.google.com"))
调用将阻塞,因此两个语句q/process
之间会有 30 秒的延迟。def
使用此队列纯粹用于副作用的示例:
(defn cache-url
[{url :url, filename :filename}]
(spit (java.io.File. filename)
(slurp url)))
(def url-queue (q/new-queue cache-url :delaytime 30))
(q/add url-queue {:url "https://github.com",
:filename "github.html"}) ; returns immediately
(q/add url-queue {:url "https://google.com",
:filename "google.html"}) ; returns immediately
现在调用q/add
立即返回。