我有两个版本的 Spark 代码。第一个使用带有 Kafka 源的结构化流:
dfStream.printSchema()
//root
//|-- dt: string (nullable = true)
//|-- ip: string (nullable = true)
//|-- device: string (nullable = true)
val dfWindowed = dfStream
.groupBy($"ip")
.agg(concat_ws(",", collect_list($"device")).alias("devices"))
.writeStream
.outputMode("complete")
.format("memory")
.start()
第二个从文件中读取。但是数据真的和上面一样:
logDF.printSchema()
//root
//|-- dt: string (nullable = true)
//|-- ip: string (nullable = true)
//|-- device: string (nullable = true)
logDF.repartition(32)
.groupBy("ip")
.agg(concat_ws(",", collect_list($"device")).alias("devices"))
问题是,虽然第二个效果很好,但第一个一直给我以下错误:
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage 1.0 (TID 28, c3-hadoop-prc-st3417.bj, executor 3): java.lang.RuntimeException: Collect cannot be used in partial aggregations.
长句..但错误似乎如下:
java.lang.RuntimeException: Collect cannot be used in partial aggregations.
我发现了几个相关的 SO 问题,但到目前为止还没有解决方案。非常感谢您对以下方面的任何建议:
- “部分聚合”的含义以及静态(非流)数据集没有此类问题的原因,
- 一种解决方法...