0

我是一个工作流数据的 scala + kafka。

我对 kafka 中的功能 commitAsync offsetRanges 有 1 个问题

我的代码:

val stream = KafkaUtils.createDirectStream[String, String](
      context,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaConfig)
    )

stream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      var isCommit = true
      rdd.foreachPartition { iter =>
        if (iter.nonEmpty) {
          if (!logicOne) {
            isCommit = false
          } else {
            val data = logicTwo()
            if (data != "") {
              isCommit = logicC()
            }
          }
        }
      }
      if (isCommit) {
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    }


function logicA() : Boolean = {
  // someone code logic at here
}

function logicB() : String = {
  // someone code logic at here
}

function logicC() : Boolean = {
  // someone code logic at here
}

当前,我运行应用程序,变量isCommit总是true 我想要从另一个函数分配的变量值,isCommit或者它从 rdd.foreachPartition 内部接收值。

====另外,我还有1个问题当我们从kafka接收100条消息时,在我们处理成功50条消息后,失败50条消息。我们如何为 50 条成功消息 commitAsync ?

谢谢大家!

4

0 回答 0