4

我在 zeppelin 笔记本中保存火花流所消耗的 kafka 消息时遇到问题。

我的代码是:

case class Message(id: Long, message: String, timestamp: Long) extends Serializable

   val ssc = new StreamingContext(sc, Seconds(2))

  val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, 
    Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
    Map("test" -> 4),
    StorageLevel.MEMORY_ONLY)
    .map { case (k, v) =>  implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
    .filter(_.id % 2 == 0)

  val mes =  messagesStream.window(Seconds(10))

  mes
  .map(m => Message(m.id, m.message, m.timestamp))
  .foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))

  ssc.start() 

当我运行%sql select * from messages它时,它不显示任何数据,但表已定义。如果我在 Cassandra 上将保存更改为 tempTable,它将正确保存并显示数据。不明白为什么会这样。

感谢帮助。

4

2 回答 2

2

好的,这就是问题所在。让我们首先回顾一下 foreachRDD 运算符的定义:

foreachRDD未按预期使用。它是最通用的输出运算符,它将函数 func 应用于从流生成的每个 RDD。该函数应该将每个 RDD 中的数据推送到外部系统,例如将 RDD 保存到文件中,或者通过网络将其写入数据库。请注意,函数 func 在运行流式应用程序的驱动程序进程中执行,并且通常会在其中包含 RDD 操作,这将强制计算流式 RDD。

因此,您的代码实际发生的情况如下:

由于 DStream 由输出操作延迟执行,就像 RDD 由 RDD 操作延迟执行一样。具体来说,DStream 输出操作中的 RDD 操作会强制处理接收到的数据。因此,如果您的应用程序没有任何输出操作,而您没有,或者有像 dstream.foreachRDD() 这样的输出操作,但其中没有任何 RDD 操作,则不会执行任何操作。系统将简单地接收数据并将其丢弃

因此,每次执行时都会丢弃 RDD 数据registerTempTable,因此 SQL 查询会给出空结果。

要解决您的问题,您需要将数据保存在某个地方(Cassandra 是一个不错的选择),然后对其进行查询。

于 2016-01-24T23:29:07.877 回答
0

如果您想避免另一个集群:另一种解决方案是将 rdd 转换为 row,然后转换为 df,然后将其作为 parquet 或 orc 保存到 hdfs,并带有附加文件 ex 的选项:

write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")

我只是想知道 AWS 博主如何能够直接在临时表上执行分析 [在此处输入链接描述][1]

好消息是结构化流即将推出 :)

[1]:aws 博客:https ://blogs.aws.amazon.com/bigdata/post/Tx3K805CZ8WFBRP/Analyze-Realtime-Data-from-Amazon-Kinesis-Streams-Using-Zeppelin-and-Spark-Stream

于 2016-08-15T20:25:15.147 回答