我正在尝试使用 Zeppelin 创建 SnappyData 流表。我对参数“rowConverter”的流表定义有疑问
Zeppelin notebook 分为几段:
第 1 段:
import org.apache.spark.sql.Row
import org.apache.spark.sql.streaming.{SchemaDStream, StreamToRowsConverter}
class RowsConverter extends StreamToRowsConverter with Serializable {
override def toRows(message: Any): Seq[Row] = {
val log = message.asInstanceOf[String]
val fields = log.split(",")
val rows = Seq(Row.fromSeq(Seq(new java.sql.Timestamp(fields(0).toLong),
fields(1),
fields(2),
fields(3),
fields(4),
fields(5).toDouble,
fields(6)
)))
rows
}
}
第 2 段:
snsc.sql(
"CREATE STREAM TABLE adImpressionStream if not exists ("sensor_id string, metric
metric string) using kafka_stream
options (storagelevel 'MEMORY_AND_DISK_SER_2',
rowConverter 'RowsConverter',
zkQuorum 'localhost:2181',
groupId 'streamConsumer', topics 'test'");"
)
第一段返回错误:
error: not found: type StreamToRowsConverter
class RowsConverter extends StreamToRowsConverter with Serializable {
^
<console>:13: error: not found: type Row
override def toRows(message: Any): Seq[Row] = {
^
<console>:16: error: not found: value Row
val rows = Seq(Row.fromSeq(Seq(new java.sql.Timestamp(fields(0).toLong),
第二段:
java.lang.RuntimeException: Failed to load class : java.lang.ClassNotFoundException: RowsConverter
我一直在尝试使用 git 中的默认代码:
snsc.sql("create stream table streamTable (userId string, clickStreamLog string) " +
"using kafka_stream options (" +
"storagelevel 'MEMORY_AND_DISK_SER_2', " +
" rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter' ," +
"kafkaParams 'zookeeper.connect->localhost:2181;auto.offset.reset->smallest;group.id->myGroupId', " +
"topics 'test')")
但我有类似的错误:
java.lang.RuntimeException: Failed to load class : java.lang.ClassNotFoundException: io.snappydata.app.streaming.KafkaStreamToRowsConverter
你能帮我解决这个问题吗?十分感谢。