你可能想看看讲述人。它专为“分析和聚合数据流”而设计。
一个简单query-seq
的做你最初追求的是:
(require '[narrator.query :refer [query-seq query-stream]])
(require '[narrator.operators :as n])
(def my-seq [:a :a :b :b :b :b :c :a :b :c])
(query-seq (n/group-by identity n/rate) my-seq)
==> {:a 3, :b 5, :c 2}
您可以按照您的建议进行过滤。
您可以使用它quasi-cardinality
来快速确定样本中唯一项目的数量(以及您的分区问题)。它为此使用 HyperLogLog 基数估计算法,例如
(query-seq (n/quasi-cardinality) my-seq)
==> 3
quasi-frequency-by
在这里演示:
(defn freq-in-seq
"returns a function that, when given a value, returns the frequency of that value in the sequence s
e.g. ((freq-in-seq [:a :a :b :c]) :a) ==> 2"
[s]
(query-seq (n/quasi-frequency-by identity) s))
((freq-in-seq my-seq) :a) ==> 3
quasi-distinct-by
:
(query-seq (n/quasi-distinct-by identity) my-seq)
==> [:a :b :c]
还有实时流分析query-stream
。
这是向您展示如何对流进行采样以获取读取的“周期”值的更改计数的一些内容:
(s/stream->seq
(->> my-seq
(map #(hash-map :timestamp %1 :value %2) (range))
(query-stream (n/group-by identity n/rate)
{:value :value :timestamp :timestamp :period 3})))
==> ({:timestamp 3, :value {:a 2, :b 1}} {:timestamp 6, :value {:b 3}} {:timestamp 9, :value {:a 1, :b 1, :c 1}} {:timestamp 12, :value {:c 1}})
结果是每 3 个项目(周期 3)发生一系列更改,并带有适当的时间戳。
您还可以编写自定义流聚合器,这可能是您在上面的流中累积值的方式。我对这些进行了快速尝试,但未能使其正常工作(目前仅在我的午休时间),但这可以代替它:
(defn lazy-value-accum
([s] (lazy-value-accum s {}))
([s m]
(when-not (empty? s)
(lazy-seq
(let [new-map (merge-with + m (:value (first s)))]
(cons new-map
(lazy-value-accum (rest s) new-map))))))
(lazy-value-accum
(s/stream->seq
(->> my-seq
(map #(hash-map :timestamp %1 :value %2) (range))
(query-stream (n/group-by identity n/rate)
{:value :value :timestamp :timestamp :period 3}))))
==> ({:a 2, :b 1} {:a 2, :b 4} {:a 3, :b 5, :c 1} {:a 3, :b 5, :c 2})
它显示了每个样本后每个值的逐渐累积计数period
,可以懒惰地读取。