4

我有一个简单的 spark 流应用程序,它从 Kafka 读取数据,然后在 http 端点(或另一个 kafka - 对于这个问题让我们考虑 http)进行转换后发送这些数据。我正在使用job-server提交作业。

我目前正在使用“auto.offset.reset”="smallest" 和 interval=3s 从源 kafka 开始消费。在幸福的情况下,一切看起来都很好。这是一段摘录:

kafkaInputDStream.foreachRDD(rdd => {
  rdd.foreach(item => {
  //This will throw exception if http endpoint isn't reachable
      httpProcessor.process(item._1, item._2)
  })
})

由于“auto.offset.reset”="smallest",这会在一个作业中处理大约 200K 消息。如果我在作业中停止 http 服务器(模拟 POST 中的某些问题)并且 httpProcessor.process 抛出异常,则作业失败并且任何未处理的内容都会丢失。我看到它在那之后每 3 秒轮询一次。

所以我的问题是:

  1. 我的假设是否正确,如果在接下来的 3 秒工作中,如果它收到 X 条消息并且在遇到错误之前只能处理 Y,那么其余 XY 将不会被处理?
  2. 有没有办法暂停来自 Kafka 的流/消费?例如,如果出现间歇性网络问题,并且很可能所有消耗的消息都将在那段时间内丢失。不断重试的东西(可能是指数退避),每当 http 端点为 UP 时,再次开始消费。

谢谢

4

2 回答 2

3

是的,您的假设是正确的,即如果您的分区失败,则暂时不会处理剩余的事件

但是,您可以调整很多参数以获得所需的行为(如果您使用 DirectKafkaInputDStream)。

让我们从"auto.offset.reset"="smallest"开始:这个参数告诉 Kafka 从头开始​​,当当前组没有存储的提交时。正如您提到的,您的 RDD 在启动后包含大量消息,我假设您没有正确提交消息。如果您期望完全一次的语义,您绝对应该考虑跟踪您的偏移量,因为 DirectKafkaStreamInput 明确不跟踪它。

起始偏移量是预先指定的,这个DStream不负责提交偏移量,这样就可以控制exact-once

在 DirectKafkaInputSream 分支 1.6 中评论

也就是说,当前每次重新启动流式传输作业时,都会重新处理您的消息。

如果您提交已处理的偏移量并在启动时将其传递到 InputDStream,则侦听器将从最后提交的偏移量继续。

关于背压,DirectKafkaInputDStream 已经使用了一个RateController,它估计应该在一批中处理多少事件。

要使用它,您必须启用背压:

"spark.streaming.backpressure.enabled": true

您还可以限制“spark.streaming.kafka.maxRatePerPartition”以添加批处理大小的上限。

如果您想自己控制背压(并且可能完全停止消费者一段时间),您可能需要实现一些StreamingListener方法并在您的工作中使用它。例如,您可以使用 StreamingListener 在每个完成批次后决定是否停止您的流式传输作业。

于 2016-04-29T05:32:41.433 回答
1

我认为 Spring Cloud Stream 可以解决您的问题。卡夫卡是来源。Spark Streaming 是处理器。Http 是接收器。只有当有来自 Kafka 的输入时,Spark Streaming 才会处理。您无需停止或恢复来自 Kafka 的输入。希望能帮助到你。

于 2016-04-27T07:56:49.877 回答