我正在从标准输入中读取字符串
(line-seq (java.io.BufferedReader. *in*))
我怎样才能:
- 将行存储在集合中
- 在某个时间间隔(比如 5 分钟)处理集合,并且
- 无论时间如何,只要它的大小增长到 n(比如 10)就处理集合?
我正在从标准输入中读取字符串
(line-seq (java.io.BufferedReader. *in*))
我怎样才能:
在这里,我给你留下了我的目的:
(line-seq (BufferedReader. xxx))
是一个序列,所以这个函数存储结果(返回一个新的)集合正如@tangrammer 所说,core-async 将是一个不错的选择,或者Lamina (sample-every)
我用一个原子拼凑了一些东西。您可能必须根据需要进行调整(例如并行执行,不使用未来来创建周期性处理线程,返回值,...)。以下代码创建processor-with-interval-and-threshold
了一个函数,该函数创建了另一个函数,该函数可以被赋予一系列元素,这些元素以您描述的方式进行处理。
(defn- periodically!
[interval f]
(future
(while true
(Thread/sleep interval)
(f))))
(defn- build-head-and-tail
[{:keys [head tail]} n elements]
(let [[a b] (->> (concat tail elements)
(split-at n))]
{:head (concat head a) :tail b}))
(defn- build-ready-elements
[{:keys [head tail]}]
{:ready (concat head tail)})
(defn processor-with-interval-and-threshold
[interval threshold f]
(let [q (atom {})]
(letfn [(process-elements! []
(let [{:keys [ready]} (swap! q build-ready-elements)]
(when-not (empty? ready)
(f ready))))]
(periodically! interval process-elements!)
(fn [sq]
(let [{:keys [head]} (swap! q build-head-and-tail threshold sq)]
(when (>= (count head) threshold)
(process-elements!)))))))
原子q
管理三个元素的映射:
:head
:首先填充并根据阈值检查的序列,:tail
:包含超过阈值的元素的序列(可能是惰性的),:ready
: 要处理的元素。现在,您可以例如执行以下操作:
(let [add! (processor-with-interval-and-threshold 300000 10 your-fn)]
(doseq [x (line-seq (java.io.BufferedReader. *in*))]
(add! [x])))
这应该足以让你开始,我猜。