我写了一个 spark kafka 生产者,它从 hive 中提取消息并推入 kafka,当我们摄入 kafka 时,大多数记录(消息)都会重复,尽管在推入 kafka 之前我没有任何重复。我添加了与一次性语义相关的配置,使 kafka 生产者具有幂等性
下面是我用于 kafka 生产者的代码片段
import java.util.{Properties, UUID}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.{AuthorizationException, OutOfOrderSequenceException, ProducerFencedException}
import org.apache.log4j.LogManager
import org.apache.spark.sql.DataFrame
object KafkaWriter {
lazy val log = LogManager.getLogger("KafkaWriter")
private def writeKafkaWithoutRepartition(df: DataFrame, topic: String, noOfPartitions: Int,
kafkaBootstrapServer: String): Unit = {
log.info("Inside Method KafkaWriter::writeKafkaWithoutRepartition no of partitions =" + noOfPartitions)
df.foreachPartition(
iter => {
val properties = getKafkaProducerPropertiesForBulkLoad(kafkaBootstrapServer)
properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString + "-" + System.currentTimeMillis().toString)
log.info("Inside Method writeKafkaWithoutRepartition:: inside writekafka :: kafka properties ::" + properties)
val kafkaProducer = new KafkaProducer[String, String](properties)
try {
log.info("kafka producer property enable.idempotence ::" + properties.getProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
kafkaProducer.initTransactions
kafkaProducer.beginTransaction
log.info("Inside Method writeKafkaWithoutRepartition:: inside each partition kafka transactions started")
iter.foreach(row => {
log.info("Inside Method writeKafkaWithoutRepartition:: inside each iterator record kafka transactions started")
kafkaProducer.send(new ProducerRecord(topic, row.getAs[String]("value")))
})
kafkaProducer.commitTransaction
log.info("Inside Method writeKafkaWithoutRepartition:: kafka transactions completed")
} catch {
case e@(_: ProducerFencedException ) =>
// We can't recover from these exceptions, so our only option is to close the producer and exit.
log.error("Exception occured while sending records to kafka ::" + e.getMessage)
kafkaProducer.close
case e: KafkaException =>
// For all other exceptions, just abort the transaction and try again.
log.error("Exception occured while sending records to kafka ::" + e.getMessage)
kafkaProducer.abortTransaction
case ex: Exception =>
// For all other exceptions, just abort the transaction and try again.
log.error("Exception occured while sending records to kafka ::" + ex.getMessage)
kafkaProducer.abortTransaction
} finally {
kafkaProducer.close
}
})
}
def writeWithoutRepartition(df: DataFrame, topic: String, noOfPartitions: Int, kafkaBootstrapServer: String): Unit = {
var repartitionedDF = df.selectExpr("to_json(struct(*)) AS value")
log.info("Inside KafkaWriter::writeWithoutRepartition ")
writeKafkaWithoutRepartition(repartitionedDF, topic, noOfPartitions, kafkaBootstrapServer)
}
def getKafkaProducerPropertiesForBulkLoad(kafkaBootstrapServer: String): Properties = {
val properties = new Properties
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer)
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
// properties.setProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "400000")
// properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "300000")
properties.put(ProducerConfig.RETRIES_CONFIG, "1000")
properties.put(ProducerConfig.ACKS_CONFIG, "all")
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
properties
}
}
在kafka消费者端设置isolation.level-->Committed。
尝试设置 min.insync.replicas-->2(在我看来这个属性可能不会起重要作用,仍然尝试) Spark 版本:2.3.1 kafka 客户端版本:2.2.1
而且我在将消息生成到 kafka 时也使用事务,初始化开始并为每条消息提交事务。我一次摄取大约 1 亿条记录,我确实将数据分成更小的块,比如之前一次将 1 亿分成 100 万摄入卡夫卡
尝试使用结构化流,仍然没有运气
df.selectExpr(s""" '${key}' as key """, "to_json(struct(*)) AS value").write.format("kafka").options(getKafkaProducerProperties(topic)).save
我不确定我是否缺少 kafka 生产者、代理或消费者端的任何配置。不知道我是否可以包括任何
提前致谢