9

尝试将大约 50K 消息加载到 KAFKA 主题中。在少数运行开始时低于异常但并非一直如此。

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state  
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) ~[kafka-clients-2.0.0.jar:?]  
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) ~[kafka-clients-2.0.0.jar:?]  
at  org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) ~[kafka-clients-2.0.0.jar:?]  
at myPackage.persistUpdatesPostAction(MyCode.java:??) ~[aKafka.jar:?]  
...  
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with
the same transactionalId, or the producer's transaction has been expired by the
broker.  

代码块如下:

public void persistUpdatesPostAction(List<Message> messageList ) {
    if ((messageList == null) || (messageList.isEmpty())) {
        return;
    }
    logger.createDebug("Messages in batch(postAction) : "+ messageList.size());
    Producer<String,String> producer = KafkaUtils.getProducer(Thread.currentThread().getName());
    try {
        producer.beginTransaction();
        createKafkaBulkInsert1(producer, messageList, "Topic1");
        createKafkaBulkInsert2(producer, messageList, "Topic2");
        createKafkaBulkInsert3(producer, messageList, "Topic3");
        producer.commitTransaction();
    } catch (Exception e) {
        producer.abortTransaction();
        producer.close();
        KafkaUtils.removeProducer(Thread.currentThread().getName());
    }
}

-----------

static Properties setPropertiesProducer() {
    Properties temp = new Properties();
    temp.put("bootstrap.servers", "localhost:9092");
    temp.put("acks", "all");
    temp.put("retries", 1);
    temp.put("batch.size", 16384);
    temp.put("linger.ms", 5);
    temp.put("buffer.memory", 33554432);
    temp.put("key.serializer",   "org.apache.kafka.common.serialization.StringSerializer");
    temp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return temp;
}

public static Producer<String, String> getProducer(String aThreadId) {
    if ((producerMap.size() == 0) || (producerMap.get(aThreadId) == null)) {
        Properties temp = producerProps;
        temp.put("transactional.id", aThreadId);
        Producer<String, String> producer = new KafkaProducer<String, String>(temp);
        producerMap.put(aThreadId, producer);
        producer.initTransactions();
        return producer;
    }
    return producerMap.get(aThreadId);
}

public static void removeProducer(String aThreadId) {
    logger.createDebug("Removing Thread ID :" + aThreadId);
    if (producerMap.get(aThreadId) == null)
        return;
    producerMap.remove(aThreadId);
}
4

4 回答 4

19

原因:org.apache.kafka.common.errors.ProducerFencedException:生产者尝试使用旧纪元进行操作。要么存在具有相同 transactionalId 的较新生产者,要么生产者的事务已被代理过期。

此异常消息不是很有帮助。我相信它试图说经纪人不再有客户发送的交易ID的任何记录。这可能是因为:

  • 其他人正在使用相同的事务 ID 并且已经提交了它。根据我的经验,除非您在客户端之间共享事务 ID,否则这种情况不太可能发生。我们确保我们的 id 使用UUID.randomUUID().
  • 交易超时并被经纪人自动化删除。

在我们的例子中,我们经常遇到事务超时,从而产生了这个异常。有 2 个属性控制代理在中止和忘记交易之前记住交易的时间。

  • transaction.max.timeout.ms-- 一个代理属性,它指定在事务被中止和遗忘之前的最大毫秒数。许多 Kafka 版本的默认值似乎是 900000(15 分钟)。 来自 Kafka 的文档说:

    事务允许的最大超时。如果客户端请求的事务时间超过此时间,则代理将在 InitProducerIdRequest 中返回错误。这可以防止客户端超时时间过长,这可能会阻止消费者从事务中包含的主题中读取。

  • transaction.timeout.ms-- 一个生产者客户端属性,在创建事务时以毫秒为单位设置超时。许多 Kafka 版本的默认值似乎是 60000(1 分钟)。来自 Kafka 的文档说:

    在主动中止正在进行的事务之前,事务协调器将等待来自生产者的事务状态更新的最长时间(以毫秒为单位)。

如果transaction.timeout.ms在客户端设置的属性超过了transaction.max.timeout.ms代理中的属性,生产者会立即抛出类似下面的异常:

org.apache.kafka.common.KafkaException: Unexpected error in
InitProducerIdResponse The transaction timeout is larger than the maximum value
allowed by the broker (as configured by transaction.max.timeout.ms).
于 2019-12-05T23:12:46.873 回答
1

我的 Producer 初始化代码中存在竞争条件。我已经通过将 Producer 映射更改为 ConcurrentHashMap 类型来确保线程安全。

于 2018-11-29T16:36:10.947 回答
0

我编写了一个单元测试来重现这一点,从这段 Java 代码中,您可以通过两个相同的 tansactional id 轻松理解这是如何发生的。

  @Test
  public void SendOffset_TwoProducerDuplicateTrxId_ThrowException() {
    // create two producer with same transactional id
    Producer producer1 = KafkaBuilder.buildProducer(trxId, servers);
    Producer producer2 = KafkaBuilder.buildProducer(trxId, servers);

    offsetMap.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1000));

    // initial and start two transactions
    sendOffsetBegin(producer1);
    sendOffsetBegin(producer2);

    try {
      // when commit first transaction it expected to throw exception
      sendOffsetEnd(producer1);

      // it expects not run here
      Assert.assertTrue(false);
    } catch (Throwable t) {
      // it expects to catch the exception
      Assert.assertTrue(t instanceof ProducerFencedException);
    }
  }

  private void sendOffsetBegin(Producer producer) {
    producer.initTransactions();
    producer.beginTransaction();
    producer.sendOffsetsToTransaction(offsetMap, consumerGroup);
  }

  private void sendOffsetEnd(Producer producer) {
    producer.commitTransaction();
  }
于 2020-06-08T13:30:42.263 回答
0

当运行应用程序的多个实例时,transactional.id 所有实例上必须相同,以满足在侦听器容器线程上生成记录时的隔离僵尸。但是,当使用不是由侦听器容器启动的事务来生成记录时,每个实例的前缀必须不同。

https://docs.spring.io/spring-kafka/reference/html/#transaction-id-prefix

于 2020-10-23T09:13:42.590 回答