我在 kafka 中有一堆消息,并使用火花流来处理这些消息。
当我的代码无法插入我的数据库时,我试图捕捉到这些消息,然后将这些消息重新插入 Kafka,以便稍后处理它们。
为了解决这个问题,我在我的 foreachRDD 函数中创建了一个名为“success”的变量。然后,当我尝试更新到数据库时,我返回一个成功插入的布尔值。我在测试期间注意到的是,当我尝试在 foreachPartition 期间插入时,这似乎效果不佳。当我离开 foreachPartition 函数时,成功值似乎被“重置”了。
stream: DStream[String]
stream
.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
var success = true
rdd.foreachPartition(partitionOfRecords => {
if (partitionOfRecords.nonEmpty) {
val listOfRecords = partitionOfRecords.toList
val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
logger.info("Insert was successful: " + successfulInsert)
if (!successfulInsert) {
logger.info("logging successful as false. Currently its set to: " + success )
success = false
logger.info("logged successful as false. Currently its set to: " + success )
}
}
})
logger.info("Insert into database successful from all partition: " + success)
if (!success) {
// send data to Kafka topic
}
}
})
然后我的日志输出显示了这一点!
2019-06-24 20:26:37 [INFO] 插入成功:false 2019-06-24 20:26:37 [INFO] 记录成功为 false。目前其设置为:true 2019-06-24 20:26:37 [INFO] 成功记录为 false。目前其设置为:false 2019-06-24 20:26:37 [INFO] 从所有分区成功插入数据库:true
即使在第三个日志中它说当前“成功”设置为 false,但是当我离开 foreachPartition 时,我再次记录它并将其设置回 true。
谁能解释为什么?或者提出不同的方法?