0

希望有人可以提供帮助。

我正在尝试将一些数据流式传输并将 IoT 设备的当前状态保存到 Kudu 中。

我目前正在为接收器使用 ForeachWriter - 遗憾的是,它仅在有一行时才有效,如果有不止一行,它会挂起并且不会将任何数据写入 Kudu 表。

有没有人见过这个?

代码:

    df.select("...DATA....." )
          .as[IoTState]
          .groupByKey(_.assetId)
          .mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateIoTState)
          .writeStream
          .foreach(new ForeachWriter[IoTState]
                  {
                    override def open(partitionId: Long, version: Long): Boolean = {
                                        true

                                      }
                                      override def process(value: IoTState): Unit = {

                                        val valueDF: DataFrame = Seq(value).toDF(
                                          "assetId"
                                          , "eventDateTimeInUTC"
                                          , "gpsLatitudeInDegrees"
                                          , "gpsLongitudeInDegrees"
                                        )
kuduContext1.upsertRows(valueDF, conf.kuduTable)
                                    }
                                  override def close(errorOrNull: Throwable): Unit = {
                                  }
                                })
          .outputMode("update")
          .trigger(Trigger.ProcessingTime("2 seconds"))
          .start()
          .awaitTermination()
4

0 回答 0