1

我正在尝试在纱线集群上使用 Kafka 测试结构化流的 foreach 并拥有这段小代码:

  val ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "data01:9092,data02:9092,data03:9092")
  .option("subscribe", "cluster-topic-01")
  .load()


  /* This block works
  val query = ds1.writeStream
        .format("console")
        .start()
  */

  // this block doesn't have any call to open()
  val query = ds1.writeStream.foreach(new ForeachWriter[Row] {
          override def open(partitionId: Long, version: Long): Boolean = {
                println("open("+partitionId+","+version+")")
                true
          }

          override def process(record: Row) : Unit = {
            // write string to connection
                println("process() :: "+record)
          }

          override def close(errorOrNull: Throwable): Unit = {
            // close the connection
                println("close()")
          }
  }).start()

  query.awaitTermination()

当我运行这个程序时,没有调用 open()。我已经比较了其他线程的其他示例,但我无法找出缺少的内容。在 foreach 之后有一个 start() 调用,这似乎与此处的文档相匹配。

我可以流式传输到文件或控制台,但无法在 foreach 中获得任何调用。

关于在哪里看的任何建议?TIA。

4

1 回答 1

0

open,process并将close在 executor 中运行RDD.foreach,因此除非您使用本地模式,否则您将无法看到它。

于 2017-03-29T00:19:56.187 回答