我正在尝试使用火花流读取来自 Kafka 的旧消息。但是,我只能在实时发送消息时检索它们(即,如果我填充新消息,而我的 spark 程序正在运行 - 然后我会收到这些消息)。
我正在更改我的 groupID 和 consumerID 以确保 zookeeper 不只是不提供它知道我的程序以前见过的消息。
假设 spark 将 zookeeper 中的偏移量视为 -1,它不应该读取队列中的所有旧消息吗?我只是误解了 kafka 队列的使用方式吗?我对火花和卡夫卡很陌生,所以我不能排除我只是误解了一些东西。
package com.kibblesandbits
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import net.liftweb.json._
object KafkaStreamingTest {
val cfg = new ConfigLoader().load
val zookeeperHost = cfg.zookeeper.host
val zookeeperPort = cfg.zookeeper.port
val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot
implicit val formats = DefaultFormats
def parser(json: String): String = {
return json
}
def main(args : Array[String]) {
val zkQuorum = "test-spark02:9092"
val group = "myGroup99"
val topic = Map("testtopic" -> 1)
val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New")
val ssc = new StreamingContext(sparkContext, Seconds(3))
val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
var gp = json_stream.map(_._2).map(parser)
gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json")
ssc.start()
}
运行此程序时,我将看到以下消息。所以我相信这不仅仅是因为设置了偏移量而没有看到消息。
2005 年 14 月 12 日 13:34:08 信息 ConsumerFetcherManager:[ConsumerFetcherManager-1417808045047] 为分区 ArrayBuffer([[testtopic,0],initOffset -1 到代理 id:1,主机:test-spark02.vpc,端口: 9092] , [[testtopic,1], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,2], initOffset -1 to broker id:1,host: test-spark02.vpc,port:9092] , [[testtopic,3], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,4], initOffset -1到代理 id:1,host:test-spark02.vpc,port:9092])
然后,如果我填充 1000 条新消息——我可以看到这 1000 条消息保存在我的临时目录中。但是我不知道如何阅读现有的消息,这些消息应该在(此时)数万。