20

I would like to do multiple aggregations in Spark Structured Streaming.

Something like this:

  • Read a stream of input files (from a folder)
  • Perform aggregation 1 (with some transformations)
  • Perform aggregation 2 (and more transformations)

When I run this in Structured Streaming, it gives me an error "Multiple streaming aggregations are not supported with streaming DataFrames/Datasets".

Is there a way to do such multiple aggregations in Structured Streaming?

4

8 回答 8

15

这不受支持,但也有其他方法。就像执行单个聚合并将其保存到 kafka 一样。从 kafka 中读取并再次应用聚合。这对我有用。

于 2017-08-04T03:00:08.983 回答
5

与 Spark 2.4.4(目前最新)一样,不支持可以使用 .foreachBatch()方法的多流聚合

一个虚拟的例子:

query =  spark
        .readStream
        .format('kafka')
        .option(..)
        .load()

       .writeStream
       .trigger(processingTime='x seconds')
       .outputMode('append')
       .foreachBatch(foreach_batch_function)
       .start()

query.awaitTermination()        


def foreach_batch_function(df, epoch_id):
     # Transformations (many aggregations)
     pass   
于 2020-01-20T08:40:44.837 回答
3

从 Spark 2.4 开始,不支持 Spark 结构化流中的多个聚合。支持这一点可能很棘手,尤其是。事件时间处于“更新”模式,因为聚合输出可能会随着迟到的事件而改变。在“附加”模式下支持这一点非常简单,但是 spark 还不支持真正的水印。

这是一个以“附加”模式添加它的建议 - https://github.com/apache/spark/pull/23576

如果有兴趣,您可以观看 PR 并在那里发布您的投票。

于 2019-02-07T22:29:02.360 回答
1

TLDR - 不支持;在某些情况下,解决方法是可能的。

更长的版本 -

  1. (一个黑客)

在某些情况下,解决方法是可能的,例如,如果您希望count(distinct)在低基数列上的流式查询中有多个,那么通过approx_count_distinct参数设置得足够低(这是第二个approx_count_distinct 的可选参数,默认为)。rsd0.05

这里如何定义“低基数”?对于具有超过 1000 个唯一值的列,我不建议使用这种方法。

因此,在您的流式查询中,您可以执行以下操作 -

(spark.readStream....
      .groupBy("site_id")
      .agg(approx_count_distinct("domain", 0.001).alias("distinct_domains")
         , approx_count_distinct("country", 0.001).alias("distinct_countries")
         , approx_count_distinct("language", 0.001).alias("distinct_languages")
      )
  )

这是它确实有效的证明:

在此处输入图像描述

注意 count(distinct) 并count_approx_distinct给出相同的结果!rsd以下是有关参数的一些指导count_approx_distinct

  • 对于具有0.02的 100 个不同值rsd的列是必要的;
  • 对于具有0.001的 1000 个不同值rsd的列是必要的。

PS。另请注意,我必须在具有 10k 个不同值的列上注释掉实验,因为我没有足够的耐心来完成它。这就是为什么我提到你不应该对具有超过 1k 个不同值的列使用这个 hack。为了匹配超过 1k 个不同值的精确计数(不同),对于HyperLogLogPlusPlus 算法的设计目标(该算法落后于 approx_count_distinct 实现)而言approx_count_distinct,需要rsd的方式太低了。

  1. (很好但更涉及的方式)

正如其他人提到的,您可以使用 Spark 的任意状态流来实现您自己的聚合;以及使用[flat]MapWithGroupState在单个流上进行尽可能多的聚合。这将是一种合法且受支持的方式,与上述仅在某些情况下有效的黑客方式不同。此方法仅适用于 Spark Scala API,不适用于 PySpark。

  1. (也许有一天这将是一个长期的解决方案)

正确的方法是在 Spark Streaming 中显示对本机多重聚合的支持 - https://github.com/apache/spark/pull/23576 - 对此 SPARK jira/ PR 投赞成票,如果您有兴趣,请表示支持在这。

于 2020-10-24T05:55:59.023 回答
0

您没有提供任何代码,所以我将使用此处引用的示例代码。

假设下面是我们供 DF 使用的初始代码。

import pyspark.sql.functions as F
spark = SparkSession. ...

# Read text from socket
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources

socketDF.printSchema()

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")

这里按名称对df 进行分组并应用聚合函数countsumbalance

grouped = csvDF.groupBy("name").agg(F.count("name"), F.sum("age"), F.avg("age"))
于 2018-12-02T20:23:07.840 回答
0

对于 spark 2.2 及更高版本(不确定早期版本),如果您可以将聚合设计为使用带有附加模式的flatMapGroupWithState ,您可以根据需要进行任意数量的聚合。这里提到了限制Spark 结构化流 - 输出模式

于 2018-06-24T05:55:07.023 回答
0

这在 Spark 2.0 中不受支持,因为结构化流 API 仍处于试验阶段。请参阅此处以查看所有当前限制的列表。

于 2016-12-07T07:33:21.603 回答
0

从 spark 结构化流 2.4.5 开始,无状态处理不支持多个聚合。但是如果您需要有状态的处理,可以多次聚合。

使用追加模式,您可以flatMapGroupWithState在分组数据集(通过使用groupByKeyAPI 获得)上多次使用 API。

于 2020-02-27T01:55:22.273 回答