1

我正在尝试使用 Zeppelin 创建 SnappyData 流表。我对参数“rowConverter”的流表定义有疑问

Zeppelin notebook 分为几段:

第 1 段:

import org.apache.spark.sql.Row
import org.apache.spark.sql.streaming.{SchemaDStream, StreamToRowsConverter}

class RowsConverter extends StreamToRowsConverter with Serializable {

override def toRows(message: Any): Seq[Row] = {
val log = message.asInstanceOf[String]
val fields = log.split(",")
val rows = Seq(Row.fromSeq(Seq(new java.sql.Timestamp(fields(0).toLong),
  fields(1),
  fields(2),
  fields(3),
  fields(4),
  fields(5).toDouble,
  fields(6)
)))

rows
}
}

第 2 段:

snsc.sql(
 "CREATE STREAM TABLE adImpressionStream if not exists ("sensor_id string, metric
metric string) using kafka_stream 
options (storagelevel 'MEMORY_AND_DISK_SER_2', 
rowConverter 'RowsConverter', 
zkQuorum 'localhost:2181',
groupId 'streamConsumer',  topics 'test'");"
)

第一段返回错误:

error: not found: type StreamToRowsConverter
class RowsConverter extends StreamToRowsConverter with Serializable {
                               ^
<console>:13: error: not found: type Row
     override def toRows(message: Any): Seq[Row] = {
                                            ^
<console>:16: error: not found: value Row
       val rows = Seq(Row.fromSeq(Seq(new java.sql.Timestamp(fields(0).toLong),

第二段:

java.lang.RuntimeException: Failed to load class : java.lang.ClassNotFoundException: RowsConverter

我一直在尝试使用 git 中的默认代码:

 snsc.sql("create stream table streamTable (userId string, clickStreamLog string) " +
 "using kafka_stream options (" +
 "storagelevel 'MEMORY_AND_DISK_SER_2', " +
" rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter' ," +   
 "kafkaParams 'zookeeper.connect->localhost:2181;auto.offset.reset->smallest;group.id->myGroupId', " +
 "topics 'test')")

但我有类似的错误:

java.lang.RuntimeException: Failed to load class : java.lang.ClassNotFoundException: io.snappydata.app.streaming.KafkaStreamToRowsConverter

你能帮我解决这个问题吗?十分感谢。

4

2 回答 2

0

您需要在类路径中提供应用程序特定的类。请参考此处设置类路径的步骤。Zeppelin 将在您的 spark-env.sh https://github.com/SnappyDataInc/snappy-poc#lets-get-this-going中获取类路径集

于 2017-10-17T02:31:20.587 回答
0

将 snappydata 解释器添加到 Apache Zeppelin,如下所示:https ://snappydatainc.github.io/snappydata/howto/use_apache_zeppelin_with_snappydata/

这将使 Zeppelin 能够领先运行,以便代码以嵌入式模式运行。特别是,您需要在集群配置中使用“-classpath”选项设置所需的 jar。

于 2017-10-23T11:02:54.157 回答