14

Kafka 0.8.2用于从 AdExchange 接收数据,然后Spark Streaming 1.4.1将数据存储到MongoDB.

我的问题是当我重新启动我的Spark Streaming工作时,例如更新新版本、修复错误、添加新功能。它将继续读取当时最新offsetkafka数据,然后在重新启动作业期间我将丢失 AdX 推送到 kafka 的数据。

我尝试了类似的auto.offset.reset -> smallest方法,但它会从 0 -> last 然后数据很大并且在 db 中重复。

我也尝试设置特定的group.idconsumer.idSpark但它是一样的。

如何将offset消耗的最新火花保存到zookeeper或者kafka然后可以从该火花读取到最新offset

4

4 回答 4

15

createDirectStream 函数的构造函数之一可以获得一个映射,该映射将保存分区 id 作为键和您开始使用的偏移量作为值。

只看这里的api:http ://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html 我所说的地图通常称为:fromOffsets

您可以将数据插入地图:

startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)

并在创建直接流时使用它:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
                streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))

每次迭代后,您可以使用以下方法获取处理后的偏移量:

rdd.asInstanceOf[HasOffsetRanges].offsetRanges

您将能够使用此数据在下一次迭代中构造 fromOffsets 映射。

您可以在此处查看完整的代码和用法:页面末尾的https://spark.apache.org/docs/latest/streaming-kafka-integration.html

于 2015-08-06T06:55:56.403 回答
2

添加到 Michael Kopaniov 的回答中,如果您真的想使用 ZK 作为存储和加载偏移地图的地方,您可以。

但是,因为您的结果没有输出到 ZK,所以除非您的输出操作是幂等的(听起来不是),否则您将无法获得可靠的语义。

如果可以将您的结果与单个原子操作中的偏移量一起存储在 mongo 中的同一文档中,那可能对您更好。

有关更多详细信息,请参阅https://www.youtube.com/watch?v=fXnNEq1v3VA

于 2015-08-10T17:33:18.187 回答
1

这里有一些代码可以用来在 ZK http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/中存储偏移量

这里有一些代码可以用来在调用 KafkaUtils.createDirectStream 时使用偏移量:http: //geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/

于 2016-06-29T20:09:36.827 回答
-1

我还没有 100% 弄清楚这一点,但你最好的选择可能是设置 JavaStreamingContext.checkpoint()。

有关示例,请参见https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing 。

根据一些博客条目https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md有一些注意事项,但几乎感觉它涉及某些边缘案例,这些案例只是暗示而不是实际上解释。

于 2015-08-07T12:43:59.213 回答