0

我有一个活泼的流表,可以从 kafka 主题中读取 json。java.sql.Timestamp经过一些工作,我已经完成了这项工作,但是在尝试将值从我的SensorData对象映射到流表 时遇到了问题。

错误发生org.apache.spark.sql.catalyst.CatalystTypeConverters在此方法的第 318 行:

  private object StringConverter extends CatalystTypeConverter[Any, String, UTF8String] {
    override def toCatalystImpl(scalaValue: Any): UTF8String = scalaValue match {
      case str: String => UTF8String.fromString(str)
      case utf8: UTF8String => utf8
    }
    override def toScala(catalystValue: UTF8String): String =
      if (catalystValue == null) null else catalystValue.toString
    override def toScalaImpl(row: InternalRow, column: Int): String =
      row.getUTF8String(column).toString
  }

我进行了调试,代码显然在这里期待一个字符串值,但我的 sensorData 对象(和流表)传感器和收集时间是时间戳。因此,它抱怨无法转换价值。

下面是我的 SensorData 类,我用它来映射来自 Kafka 的传入 json 消息中的值。在我的自定义转换器中,然后我将这些值映射到我Seq[Row]toRows(...)方法中。

class SensorData {
    var sensor_id: String = _
    var metric: String = _
    var collection_time: java.sql.Timestamp = _
    var sensor_time: java.sql.Timestamp = _
//    var collection_time: String = _
//    var sensor_time: String = _
    var value: String = _
    var year_num: Int = _
    var month_num: Int = _
    var day_num: Int = _
    var hour_num: Int = _

}

这是我的流表:

snsc.sql(s"CREATE STREAM TABLE sensor_data_stream if not exists " +
        "(sensor_id string, " +
        "metric string, " +
        "collection_time TIMESTAMP, " +
        "value VARCHAR(128), " +
        "sensor_time TIMESTAMP, " +
        "year_num integer, " +
        "month_num integer, " +
        "day_num integer, " +
        "hour_num integer  " +
        ") " +
        "using kafka_stream " +
        "options (storagelevel 'MEMORY_AND_DISK_SER_2', " +
        "rowConverter 'org.me.streaming.sensor.test.converter.SensorConverter', " +
        "zkQuorum 'localhost:2181', " +
        " groupId 'sensorConsumer',  topics 'sensorTest:01')")

现在为了解决这个问题,我将 SensorData 对象中的数据类型更改为字符串以及流表中的列数据类型:即:

    "collection_time string, " +
    "sensor_time string, " +

结果,在更改此数据类型后,我能够成功地将数据从 Kafka 流式传输到我的目标列表。

我的问题...我对 SnappyData/Streaming 世界还很陌生,想知道这是否是一个错误(已知/未知),或者是否有更优雅的方法将时间戳数据类型绑定到流表?

******根据响应更新********

这是我的行转换器:

class SensorConverter extends StreamToRowsConverter with Serializable {

  override def toRows(message: Any): Seq[Row] = {

  val mapper = new ObjectMapper()
  mapper.registerModule(DefaultScalaModule)    

  val sensor = mapper.readValue(message.toString(), classOf[SensorData])

    Seq(Row.fromSeq(Seq(
      sensor.sensor_id,
      sensor.metric,
      sensor.collection_time,
      sensor.value,
      sensor.sensor_time,
      sensor.year_num,
      sensor.month_num,
      sensor.day_num,
      sensor.hour_num)))
  }

}

我最初尝试转换一个 java 对象,但在解码它时遇到了问题(可能是由于我目前在升级时缺乏对 API 的了解)。我最终只是将一个 json 字符串传递给 Kafka。

我在@ https://github.com/SnappyDataInc/snappy-poc/blob/master/src/main/scala/io/snappydata/adanalytics/Codec.scala提供的示例中看到 我没有正确包装传入的时间戳值在构建我的 Seq[Row] 时使用 java.sql.Timestamp 调用(这很长一段时间)。我会试一试,看看是否能解决我的问题。

4

1 回答 1

1

这是一个示例,您可以参考将时间戳与流表一起使用。 https://github.com/SnappyDataInc/snappy-poc/blob/master/src/main/scala/io/snappydata/adanalytics/Codec.scala

请检查 AdImpressionToRowsConverter#toRows 实施。在这种情况下,我们从 kafka 接收长值(System.currentTimeMills)并转换为 java.sql.Timestamp

这是时间戳类型的流表定义 -
https://github.com/SnappyDataInc/snappy-poc/blob/master/src/main/scala/io/snappydata/adanalytics/SnappySQLLogAggregatorJob.scala

你能提供 SensorConvertor#toRows 的实现吗?您是否为 SensorData 对象使用相应的解码器?

于 2016-08-12T11:19:44.687 回答