2

我正在尝试使用 EventHubsForeachWriter,如下所示

val ehConf = EventHubsConf("YOUR_CONNECTION_STRING") 
val writer = EventHubsForeachWriter(ehConf)

val query =
  streamingSelectDF
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()

但我遇到了一个例外:

notebook:22: error: type mismatch;
 found   : org.apache.spark.sql.eventhubs.EventHubsForeachWriter
 required: org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
    .foreach(writer)
4

1 回答 1

1

在这个 github问题中找到了答案。

所以,我想以下应该有效:

val query =
  streamingSelectDF
    .select(to_json(struct("*")) as 'body)
    .selectExpr("cast(body as string)")
    .as[String]
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()
于 2019-01-29T14:05:21.647 回答