我有一个活泼的流表,可以从 kafka 主题中读取 json。java.sql.Timestamp
经过一些工作,我已经完成了这项工作,但是在尝试将值从我的SensorData
对象映射到流表 时遇到了问题。
错误发生org.apache.spark.sql.catalyst.CatalystTypeConverters
在此方法的第 318 行:
private object StringConverter extends CatalystTypeConverter[Any, String, UTF8String] {
override def toCatalystImpl(scalaValue: Any): UTF8String = scalaValue match {
case str: String => UTF8String.fromString(str)
case utf8: UTF8String => utf8
}
override def toScala(catalystValue: UTF8String): String =
if (catalystValue == null) null else catalystValue.toString
override def toScalaImpl(row: InternalRow, column: Int): String =
row.getUTF8String(column).toString
}
我进行了调试,代码显然在这里期待一个字符串值,但我的 sensorData 对象(和流表)传感器和收集时间是时间戳。因此,它抱怨无法转换价值。
下面是我的 SensorData 类,我用它来映射来自 Kafka 的传入 json 消息中的值。在我的自定义转换器中,然后我将这些值映射到我Seq[Row]
的toRows(...)
方法中。
class SensorData {
var sensor_id: String = _
var metric: String = _
var collection_time: java.sql.Timestamp = _
var sensor_time: java.sql.Timestamp = _
// var collection_time: String = _
// var sensor_time: String = _
var value: String = _
var year_num: Int = _
var month_num: Int = _
var day_num: Int = _
var hour_num: Int = _
}
这是我的流表:
snsc.sql(s"CREATE STREAM TABLE sensor_data_stream if not exists " +
"(sensor_id string, " +
"metric string, " +
"collection_time TIMESTAMP, " +
"value VARCHAR(128), " +
"sensor_time TIMESTAMP, " +
"year_num integer, " +
"month_num integer, " +
"day_num integer, " +
"hour_num integer " +
") " +
"using kafka_stream " +
"options (storagelevel 'MEMORY_AND_DISK_SER_2', " +
"rowConverter 'org.me.streaming.sensor.test.converter.SensorConverter', " +
"zkQuorum 'localhost:2181', " +
" groupId 'sensorConsumer', topics 'sensorTest:01')")
现在为了解决这个问题,我将 SensorData 对象中的数据类型更改为字符串以及流表中的列数据类型:即:
"collection_time string, " +
"sensor_time string, " +
结果,在更改此数据类型后,我能够成功地将数据从 Kafka 流式传输到我的目标列表。
我的问题...我对 SnappyData/Streaming 世界还很陌生,想知道这是否是一个错误(已知/未知),或者是否有更优雅的方法将时间戳数据类型绑定到流表?
******根据响应更新********
这是我的行转换器:
class SensorConverter extends StreamToRowsConverter with Serializable {
override def toRows(message: Any): Seq[Row] = {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val sensor = mapper.readValue(message.toString(), classOf[SensorData])
Seq(Row.fromSeq(Seq(
sensor.sensor_id,
sensor.metric,
sensor.collection_time,
sensor.value,
sensor.sensor_time,
sensor.year_num,
sensor.month_num,
sensor.day_num,
sensor.hour_num)))
}
}
我最初尝试转换一个 java 对象,但在解码它时遇到了问题(可能是由于我目前在升级时缺乏对 API 的了解)。我最终只是将一个 json 字符串传递给 Kafka。
我在@ https://github.com/SnappyDataInc/snappy-poc/blob/master/src/main/scala/io/snappydata/adanalytics/Codec.scala提供的示例中看到 我没有正确包装传入的时间戳值在构建我的 Seq[Row] 时使用 java.sql.Timestamp 调用(这很长一段时间)。我会试一试,看看是否能解决我的问题。