我在 zeppelin 笔记本中保存火花流所消耗的 kafka 消息时遇到问题。
我的代码是:
case class Message(id: Long, message: String, timestamp: Long) extends Serializable
val ssc = new StreamingContext(sc, Seconds(2))
val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
Map("test" -> 4),
StorageLevel.MEMORY_ONLY)
.map { case (k, v) => implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
.filter(_.id % 2 == 0)
val mes = messagesStream.window(Seconds(10))
mes
.map(m => Message(m.id, m.message, m.timestamp))
.foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))
ssc.start()
当我运行%sql select * from messages
它时,它不显示任何数据,但表已定义。如果我在 Cassandra 上将保存更改为 tempTable,它将正确保存并显示数据。不明白为什么会这样。
感谢帮助。