0

我写了一个 spark kafka 生产者,它从 hive 中提取消息并推入 kafka,当我们摄入 kafka 时,大多数记录(消息)都会重复,尽管在推入 kafka 之前我没有任何重复。我添加了与一次性语义相关的配置,使 kafka 生产者具有幂等性

下面是我用于 kafka 生产者的代码片段

import java.util.{Properties, UUID}

import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.{AuthorizationException, OutOfOrderSequenceException, ProducerFencedException}
import org.apache.log4j.LogManager
import org.apache.spark.sql.DataFrame

object KafkaWriter {

   lazy val log = LogManager.getLogger("KafkaWriter")


   private def writeKafkaWithoutRepartition(df: DataFrame, topic: String, noOfPartitions: Int,
                                            kafkaBootstrapServer: String): Unit = {
      log.info("Inside Method KafkaWriter::writeKafkaWithoutRepartition no of partitions =" + noOfPartitions)
      df.foreachPartition(
         iter => {
            val properties = getKafkaProducerPropertiesForBulkLoad(kafkaBootstrapServer)
            properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString + "-" + System.currentTimeMillis().toString)
            log.info("Inside Method writeKafkaWithoutRepartition:: inside writekafka :: kafka properties ::" + properties)
            val kafkaProducer = new KafkaProducer[String, String](properties)
            try {
               log.info("kafka producer property enable.idempotence ::" + properties.getProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))

               kafkaProducer.initTransactions
               kafkaProducer.beginTransaction
               log.info("Inside Method writeKafkaWithoutRepartition:: inside each partition kafka transactions started")

               iter.foreach(row => {
                  log.info("Inside Method writeKafkaWithoutRepartition:: inside each iterator record kafka transactions started")
                  kafkaProducer.send(new ProducerRecord(topic, row.getAs[String]("value")))
               })
               kafkaProducer.commitTransaction
               log.info("Inside Method writeKafkaWithoutRepartition:: kafka transactions completed")
            } catch {
               case e@(_: ProducerFencedException ) =>
                  // We can't recover from these exceptions, so our only option is to close the producer and exit.
                  log.error("Exception occured while sending records to kafka ::" + e.getMessage)
                  kafkaProducer.close
               case e: KafkaException =>
                  // For all other exceptions, just abort the transaction and try again.
                  log.error("Exception  occured while sending records to kafka ::" + e.getMessage)
                  kafkaProducer.abortTransaction
               case ex: Exception =>
                  // For all other exceptions, just abort the transaction and try again.
                  log.error("Exception occured while sending records to kafka ::" + ex.getMessage)
                  kafkaProducer.abortTransaction
            } finally {
               kafkaProducer.close
            }

         })
   }

   def writeWithoutRepartition(df: DataFrame, topic: String, noOfPartitions: Int, kafkaBootstrapServer: String): Unit = {
      var repartitionedDF = df.selectExpr("to_json(struct(*)) AS value")
      log.info("Inside KafkaWriter::writeWithoutRepartition ")
      writeKafkaWithoutRepartition(repartitionedDF, topic, noOfPartitions, kafkaBootstrapServer)
   }

   def getKafkaProducerPropertiesForBulkLoad(kafkaBootstrapServer: String): Properties = {
      val properties = new Properties
      properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer)
      properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
      properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
      properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
//      properties.setProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "400000")
//      properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "300000")
      properties.put(ProducerConfig.RETRIES_CONFIG, "1000")
      properties.put(ProducerConfig.ACKS_CONFIG, "all")
      properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
      properties
   }

}

在kafka消费者端设置isolation.level-->Committed。

尝试设置 min.insync.replicas-->2(在我看来这个属性可能不会起重要作用,仍然尝试) Spark 版本:2.3.1 kafka 客户端版本:2.2.1

而且我在将消息生成到 kafka 时也使用事务,初始化开始并为每条消息提交事务。我一次摄取大约 1 亿条记录,我确实将数据分成更小的块,比如之前一次将 1 亿分成 100 万摄入卡夫卡

尝试使用结构化流,仍然没有运气

df.selectExpr(s""" '${key}' as key """, "to_json(struct(*)) AS value").write.format("kafka").options(getKafkaProducerProperties(topic)).save

我不确定我是否缺少 kafka 生产者、代理或消费者端的任何配置。不知道我是否可以包括任何

提前致谢

4

0 回答 0