我是一个工作流数据的 scala + kafka。
我对 kafka 中的功能 commitAsync offsetRanges 有 1 个问题
我的代码:
val stream = KafkaUtils.createDirectStream[String, String](
context,
PreferConsistent,
Subscribe[String, String](topics, kafkaConfig)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
var isCommit = true
rdd.foreachPartition { iter =>
if (iter.nonEmpty) {
if (!logicOne) {
isCommit = false
} else {
val data = logicTwo()
if (data != "") {
isCommit = logicC()
}
}
}
}
if (isCommit) {
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
}
function logicA() : Boolean = {
// someone code logic at here
}
function logicB() : String = {
// someone code logic at here
}
function logicC() : Boolean = {
// someone code logic at here
}
当前,我运行应用程序,变量isCommit
总是true
我想要从另一个函数分配的变量值,isCommit
或者它从 rdd.foreachPartition 内部接收值。
====另外,我还有1个问题当我们从kafka接收100条消息时,在我们处理成功50条消息后,失败50条消息。我们如何为 50 条成功消息 commitAsync ?
谢谢大家!