希望有人可以提供帮助。
我正在尝试将一些数据流式传输并将 IoT 设备的当前状态保存到 Kudu 中。
我目前正在为接收器使用 ForeachWriter - 遗憾的是,它仅在有一行时才有效,如果有不止一行,它会挂起并且不会将任何数据写入 Kudu 表。
有没有人见过这个?
代码:
df.select("...DATA....." )
.as[IoTState]
.groupByKey(_.assetId)
.mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateIoTState)
.writeStream
.foreach(new ForeachWriter[IoTState]
{
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(value: IoTState): Unit = {
val valueDF: DataFrame = Seq(value).toDF(
"assetId"
, "eventDateTimeInUTC"
, "gpsLatitudeInDegrees"
, "gpsLongitudeInDegrees"
)
kuduContext1.upsertRows(valueDF, conf.kuduTable)
}
override def close(errorOrNull: Throwable): Unit = {
}
})
.outputMode("update")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
.awaitTermination()