2

我正在从 Spark Streaming 迁移到 Structured Streaming,并且遇到以下代码的问题:

def processDataSet(inputDataset: Dataset[MyMessage], foobar: FooBar) = {
    inputDataset.foreachPartition { partitionIterator =>
      val filteredIterator = partitionIterator.filter(foobar.filter)
      ...
      ...
    }
}       
val streamingQuery = inputDataset
  .writeStream
  .trigger(ProcessingTime("5 seconds"))
  .outputMode("append")
  .format("console")
  .start

它出现以下错误AnalysisException

引起:org.apache.spark.sql.AnalysisException:带有流源的查询必须用writeStream.start();;

流式查询不foreachPartition支持?在这种情况下writeStream.foreach实施的唯一方法是什么?foreachPartition

我想避免发送每个事件,而是累积所有行,形成一个巨大的 POST 请求正文并将其发送到 HTTP 端点。因此,如果一个批次中有 1000 个事件和 5 个分区,则并行生成 5 个请求,每个请求正文中有 200 个事件。

4

1 回答 1

2

TL;DR是的。foreachPartition不支持操作,您应该改用ForeachWriter

引用 foreachPartition 的scaladoc

foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit将函数应用于f此数据集的每个分区。

正如您现在可能已经发现的那样,foreach是一个动作,因此会触发 Spark 执行。

由于您使用流数据集,因此不允许使用“传统”方法(如foreach.

引用结构化流的不受支持的操作

此外,还有一些 Dataset 方法不适用于流数据集。它们是立即运行查询并返回结果的操作,这在流数据集上没有意义。相反,这些功能可以通过显式启动流式查询来完成(参见下一节)。

流式替代方案之一是foreach运营商(又名接收器)。这就是foreachPartition结构化流媒体中的做法。

使用 Foreach引用:

foreach 操作允许对输出数据进行任意操作。

要使用它,您必须实现 interface ForeachWriter,该接口具有在触发器后生成作为输出的行序列时调用的方法。


我想避免发送每个事件,而是累积所有行,形成一个巨大的 POST 请求正文并将其发送到 HTTP 端点。因此,如果一个批次中有 1000 个事件和 5 个分区,则并行生成 5 个请求,每个请求正文中有 200 个事件。

在将数据集写入接收器之前,这似乎是一种聚合,不是吗?使用groupBy运算符和collect_list函数对行进行分组,以便writeStream您拥有任意数量的组。

除非没有其他方法,否则我宁愿避免处理 RDD 的这种称为分区的低级特性作为优化写入的一种方式。

于 2017-07-06T04:48:43.627 回答