0

我正在从标准输入中读取字符串

(line-seq (java.io.BufferedReader. *in*))

我怎样才能:

  1. 将行存储在集合中
  2. 在某个时间间隔(比如 5 分钟)处理集合,并且
  3. 无论时间如何,只要它的大小增长到 n(比如 10)就处理集合?
4

3 回答 3

1

在这里,我给你留下了我的目的:

  1. 正如您可以在http://clojuredocs.org/clojure_core/clojure.core/line-seq检查结果(line-seq (BufferedReader. xxx))是一个序列,所以这个函数存储结果(返回一个新的)集合
  2. 您可以使用 clojure/core.async 超时功能http://clojure.github.io/core.async/#clojure.core.async/timeout来完成,您可以查看https://github.com/clojure /core.async/blob/master/examples/walkthrough.clj来熟悉这个库
  3. 只需使用条件 (if, when ...) 来检查集合的计数
于 2013-11-12T08:50:28.383 回答
1

正如@tangrammer 所说,core-async 将是一个不错的选择,或者Lamina (sample-every)

于 2013-11-13T00:00:34.237 回答
0

我用一个原子拼凑了一些东西。您可能必须根据需要进行调整(例如并行执行,不使用未来来创建周期性处理线程,返回值,...)。以下代码创建processor-with-interval-and-threshold了一个函数,该函数创建了另一个函数,该函数可以被赋予一系列元素,这些元素以您描述的方式进行处理。

(defn- periodically!
  [interval f]
  (future
    (while true
      (Thread/sleep interval)
      (f))))

(defn- build-head-and-tail
  [{:keys [head tail]} n elements]
  (let [[a b] (->> (concat tail elements)
                   (split-at n))]
    {:head (concat head a) :tail b}))

(defn- build-ready-elements
  [{:keys [head tail]}]
  {:ready (concat head tail)})

(defn processor-with-interval-and-threshold
  [interval threshold f]
  (let [q (atom {})]
    (letfn [(process-elements! []
              (let [{:keys [ready]} (swap! q build-ready-elements)]
                (when-not (empty? ready)
                  (f ready))))]
      (periodically! interval process-elements!)
      (fn [sq]
        (let [{:keys [head]} (swap! q build-head-and-tail threshold sq)]
          (when (>= (count head) threshold)
            (process-elements!)))))))

原子q管理三个元素的映射:

  • :head:首先填充并根据阈值检查的序列,
  • :tail:包含超过阈值的元素的序列(可能是惰性的),
  • :ready: 要处理的元素。

现在,您可以例如执行以下操作:

(let [add! (processor-with-interval-and-threshold 300000 10 your-fn)]
  (doseq [x (line-seq (java.io.BufferedReader. *in*))]
    (add! [x])))          

这应该足以让你开始,我猜。

于 2013-11-12T09:31:32.493 回答