TLDR - 不支持;在某些情况下,解决方法是可能的。
更长的版本 -
- (一个黑客)
在某些情况下,解决方法是可能的,例如,如果您希望count(distinct)
在低基数列上的流式查询中有多个,那么通过将approx_count_distinct
参数设置得足够低(这是第二个approx_count_distinct 的可选参数,默认为)。rsd
0.05
这里如何定义“低基数”?对于具有超过 1000 个唯一值的列,我不建议使用这种方法。
因此,在您的流式查询中,您可以执行以下操作 -
(spark.readStream....
.groupBy("site_id")
.agg(approx_count_distinct("domain", 0.001).alias("distinct_domains")
, approx_count_distinct("country", 0.001).alias("distinct_countries")
, approx_count_distinct("language", 0.001).alias("distinct_languages")
)
)
这是它确实有效的证明:
注意 count(distinct) 并count_approx_distinct
给出相同的结果!rsd
以下是有关参数的一些指导count_approx_distinct
:
- 对于具有0.02的 100 个不同值
rsd
的列是必要的;
- 对于具有0.001的 1000 个不同值
rsd
的列是必要的。
PS。另请注意,我必须在具有 10k 个不同值的列上注释掉实验,因为我没有足够的耐心来完成它。这就是为什么我提到你不应该对具有超过 1k 个不同值的列使用这个 hack。为了匹配超过 1k 个不同值的精确计数(不同),对于HyperLogLogPlusPlus 算法的设计目标(该算法落后于 approx_count_distinct 实现)而言approx_count_distinct
,需要rsd
的方式太低了。
- (很好但更涉及的方式)
正如其他人提到的,您可以使用 Spark 的任意状态流来实现您自己的聚合;以及使用[flat]MapWithGroupState在单个流上进行尽可能多的聚合。这将是一种合法且受支持的方式,与上述仅在某些情况下有效的黑客方式不同。此方法仅适用于 Spark Scala API,不适用于 PySpark。
- (也许有一天这将是一个长期的解决方案)
正确的方法是在 Spark Streaming 中显示对本机多重聚合的支持 - https://github.com/apache/spark/pull/23576 - 对此 SPARK jira/ PR 投赞成票,如果您有兴趣,请表示支持在这。