3

在使用 snappy shell 中的 kafka 创建火花流表时,我看到了一个问题。

'异常'无效输入'C',预期dmlOperation,插入,withIdentifier,选择或放置(第1行,第1列):'

参考:http ://snappydatainc.github.io/snappydata/streamingWithSQL/#spark-streaming-overview

这是我的sql:

CREATE STREAM TABLE if not exists sensor_data_stream 
(sensor_id string, metric string)
using kafka_stream 
options (
    storagelevel 'MEMORY_AND_DISK_SER_2',
    rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter',
    zkQuorum 'localhost:2181',
    groupId 'streamConsumer',
    topics 'test:01');

shell 似乎不喜欢第一个字符“C”处的脚本。我正在尝试使用以下命令执行脚本:

snappy> run '/scripts/my_test_sensor_script.sql';

任何帮助表示赞赏!

4

2 回答 2

3

文档和实际语法存在一些不一致。正确的语法是:

CREATE STREAM TABLE sensor_data_stream if not exists (sensor_id string, 
metric string) using kafka_stream 
options (storagelevel 'MEMORY_AND_DISK_SER_2', 
rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter', 
zkQuorum 'localhost:2181',
 groupId 'streamConsumer',  topics 'test:01');

您需要做的另一件事是为您的数据编写行转换器

于 2016-08-09T04:18:48.800 回答
0

Mike,您需要通过实现以下特征来创建自己的 rowConverter 类 -

trait StreamToRowsConverter extends Serializable {
  def toRows(message: Any): Seq[Row]
}

然后在 DDL 中指定 rowConverter 完全限定的类名。rowConverter 特定于模式。'io.snappydata.app.streaming.KafkaStreamToRowsConverter' 只是一个占位符类名,应替换为您自己的 rowConverter 类。

于 2016-08-10T06:10:43.983 回答