我有一个简单的 spark 流应用程序,它从 Kafka 读取数据,然后在 http 端点(或另一个 kafka - 对于这个问题让我们考虑 http)进行转换后发送这些数据。我正在使用job-server提交作业。
我目前正在使用“auto.offset.reset”="smallest" 和 interval=3s 从源 kafka 开始消费。在幸福的情况下,一切看起来都很好。这是一段摘录:
kafkaInputDStream.foreachRDD(rdd => {
rdd.foreach(item => {
//This will throw exception if http endpoint isn't reachable
httpProcessor.process(item._1, item._2)
})
})
由于“auto.offset.reset”="smallest",这会在一个作业中处理大约 200K 消息。如果我在作业中停止 http 服务器(模拟 POST 中的某些问题)并且 httpProcessor.process 抛出异常,则作业失败并且任何未处理的内容都会丢失。我看到它在那之后每 3 秒轮询一次。
所以我的问题是:
- 我的假设是否正确,如果在接下来的 3 秒工作中,如果它收到 X 条消息并且在遇到错误之前只能处理 Y,那么其余 XY 将不会被处理?
- 有没有办法暂停来自 Kafka 的流/消费?例如,如果出现间歇性网络问题,并且很可能所有消耗的消息都将在那段时间内丢失。不断重试的东西(可能是指数退避),每当 http 端点为 UP 时,再次开始消费。
谢谢