4

我有一个很大的值列表,从 0 到 100,000 范围内(为了清楚起见,这里用字母表示)。每个输入中可能有几千个项目。

[a a a a b b b b c f d b c f ... ]

我想找到计数超过某个阈值的数字计数。例如,如果阈值为 3,则答案为{a: 4, b: 5}

显而易见的方法是按身份分组,计算每个分组,然后过滤。

这是一个与语言无关的问题,但在 Clojure 中(如果您不了解 Clojure,请不要推迟!):

(filter (fn [[k cnt]] (> cnt threshold)) (frequencies input))

这个函数运行在非常多的输入上,每个输入都非常大,所以分组和过滤是一个昂贵的操作。如果输入永远不能产生超过给定阈值的任何输出或以其他方式划分问题空间,我想找到某种保护函数,该函数将提前返回。例如,最简单的是if the size of the input is less than the size of the threshold return nil.

我正在寻找一个更好的保护函数,如果输入不能产生任何输出,它将跳过计算。或者一种更快的方式来产生输出。

显然,它必须比分组本身便宜。一个很好的解决方案涉及通过不同的输入集对输入进行计数,但最终与分组一样昂贵......

我有一个想法,概率数据结构可能是关键。有任何想法吗?

(我标记了 hyerloglog,虽然我认为它不适用,因为它不提供计数)

4

3 回答 3

1

你可能想看看讲述人。它专为“分析和聚合数据流”而设计。

一个简单query-seq的做你最初追求的是:

(require '[narrator.query :refer [query-seq query-stream]])
(require '[narrator.operators :as n])

(def my-seq [:a :a :b :b :b :b :c :a :b :c])
(query-seq (n/group-by identity n/rate) my-seq)
==> {:a 3, :b 5, :c 2}

您可以按照您的建议进行过滤。

您可以使用它quasi-cardinality来快速确定样本中唯一项目的数量(以及您的分区问题)。它为此使用 HyperLogLog 基数估计算法,例如

(query-seq (n/quasi-cardinality) my-seq)
==> 3

quasi-frequency-by在这里演示:

(defn freq-in-seq
  "returns a function that, when given a value, returns the frequency of that value in the sequence s
   e.g. ((freq-in-seq [:a :a :b :c]) :a)  ==> 2"
  [s]
  (query-seq (n/quasi-frequency-by identity) s))

((freq-in-seq my-seq) :a) ==> 3

quasi-distinct-by

(query-seq (n/quasi-distinct-by identity) my-seq)
==> [:a :b :c]

还有实时流分析query-stream

这是向您展示如何对流进行采样以获取读取的“周期”值的更改计数的一些内容:

(s/stream->seq 
  (->> my-seq
       (map #(hash-map :timestamp %1 :value %2) (range))
       (query-stream (n/group-by identity n/rate) 
                     {:value :value :timestamp :timestamp :period 3})))
==> ({:timestamp 3, :value {:a 2, :b 1}} {:timestamp 6, :value {:b 3}} {:timestamp 9, :value {:a 1, :b 1, :c 1}} {:timestamp 12, :value {:c 1}})

结果是每 3 个项目(周期 3)发生一系列更改,并带有适当的时间戳。

您还可以编写自定义流聚合器,这可能是您在上面的流中累积值的方式。我对这些进行了快速尝试,但未能使其正常工作(目前仅在我的午休时间),但这可以代替它:

(defn lazy-value-accum
  ([s] (lazy-value-accum s {}))
  ([s m]
   (when-not (empty? s)
     (lazy-seq
      (let [new-map (merge-with + m (:value (first s)))]
        (cons new-map
              (lazy-value-accum (rest s) new-map))))))


(lazy-value-accum
  (s/stream->seq 
    (->> my-seq
         (map #(hash-map :timestamp %1 :value %2) (range))
         (query-stream (n/group-by identity n/rate) 
                       {:value :value :timestamp :timestamp :period 3}))))
==> ({:a 2, :b 1} {:a 2, :b 4} {:a 3, :b 5, :c 1} {:a 3, :b 5, :c 2})

它显示了每个样本后每个值的逐渐累积计数period,可以懒惰地读取。

于 2015-10-12T13:51:26.267 回答
1

如何使用 partition-all 生成最大大小为 n 的分区的惰性列表,在每个分区上应用频率,合并它们,然后过滤最终映射?

(defn lazy-count-and-filter
  [coll n threshold]
  (filter #(< threshold (val %))
          (apply (partial merge-with +) 
                 (map frequencies 
                      (partition-all n coll)))))

前任:

(lazy-count-and-filter [:a :c :b :c :a :d :a] 2 1)
==> ([:a 3] [:c 2])
于 2015-10-13T15:36:23.907 回答
0

如果您希望加快单个节点上的工作,请考虑使用 reducers 或 core.async,如这篇博文所述。

如果这是一个非常大的数据集,并且经常需要此操作,并且您有资源拥有一个多节点集群,您可以考虑设置 Storm 或 Onyx。

实际上,听起来减速器将以最少的工作量为您带来最大的收益。使用我列出的所有选项,更强大/更灵活/更快的解决方案需要更多时间来理解。从最简单到最强大的顺序,它们是 reducers、core.async、Storm、Onyx。

于 2015-10-17T06:56:43.520 回答