我正在从 Spark Streaming 迁移到 Structured Streaming,并且遇到以下代码的问题:
def processDataSet(inputDataset: Dataset[MyMessage], foobar: FooBar) = {
inputDataset.foreachPartition { partitionIterator =>
val filteredIterator = partitionIterator.filter(foobar.filter)
...
...
}
}
val streamingQuery = inputDataset
.writeStream
.trigger(ProcessingTime("5 seconds"))
.outputMode("append")
.format("console")
.start
它出现以下错误AnalysisException
:
引起:org.apache.spark.sql.AnalysisException:带有流源的查询必须用writeStream.start();;
流式查询不foreachPartition
支持?在这种情况下writeStream.foreach
实施的唯一方法是什么?foreachPartition
我想避免发送每个事件,而是累积所有行,形成一个巨大的 POST 请求正文并将其发送到 HTTP 端点。因此,如果一个批次中有 1000 个事件和 5 个分区,则并行生成 5 个请求,每个请求正文中有 200 个事件。