问题标签 [spring-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
15074 浏览

java - Kafka 消费者异常和偏移提交

我一直在尝试为 Spring Kafka 做一些 POC 工作。具体来说,我想尝试在 Kafka 中消费消息时处理错误的最佳实践。

我想知道是否有人能够提供帮助:

  1. 分享发生故障时 Kafka 消费者应该做什么的最佳实践
  2. 帮助我了解 AckMode Record 的工作原理,以及在侦听器方法中引发异常时如何防止提交到 Kafka 偏移队列。

2的代码示例如下:

鉴于 AckMode 设置为 RECORD,根据文档

当侦听器在处理完记录后返回时提交偏移量。

如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的代码/配置/命令组合对其进行测试时,情况并非如此。偏移量仍然会更新,并且会继续处理下一条消息。

我的配置:

我的代码:

验证偏移量的命令:

我正在使用 kafka_2.12-0.10.2.0 和 org.springframework.kafka:spring-kafka:1.1.3.RELEASE

0 投票
1 回答
767 浏览

spring-cloud-stream - Spring Cloud Stream 中的批处理模式和自定义错误处理

在开始使用 Spring Cloud Stream 之前,我使用的是 Spring-Kafka 及其对批量消费和自定义错误处理的支持。注意这段代码的最后两行:

但是,使用 Spring Cloud Stream 我找不到如何配置它。我只能找到这些配置属性:

spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset,enableDlq

因此,是否可以(是否)可以在 Spring Cloud Stream 中注册自定义错误处理程序并将 AckMode 设置为 BATCH?

谢谢你的支持。

0 投票
3 回答
23111 浏览

java - Spring Kafka:ApplicationContext中不同对象的多个侦听器

我可以向社区查询一下收听多个主题的最佳方式是什么,每个主题都包含不同类别的消息?

在过去的几天里,我一直在玩 Spring Kafka。到目前为止我的思考过程:

  • 因为在初始化 KafkaListenerContainerFactory 时需要将反序列化器传递给 DefaultKafkaConsumerFactory。这似乎表明,如果我需要多个容器,每个容器都反序列化不同类型的消息,我将无法使用 @EnableKafka 和 @KafkaListener 注释。

  • 这使我认为这样做的唯一方法是实例化多个 KafkaMessageListenerContainers。

  • 鉴于 KafkaMessageListenerContainers 是单线程的,我需要同时收听多个主题,我真的应该使用多个 ConcurrentKafkaMessageListenerContainers。

我会在正确的轨道上吗?有一个更好的方法吗?

谢谢!

0 投票
3 回答
13505 浏览

spring - 添加多个 KafkaListenerContainerFactories 的问题

嗨,我目前正在涉足 Spring Kafka,并成功地将单个 KafkaListenerContainerFactory 添加到我的侦听器中。现在我想添加多个 KafkaListenerContainerFactory(一个用于将在 json 中包含消息的主题,另一个用于字符串)。请参见下面的代码:

运行它会给我以下错误:

我究竟做错了什么?

0 投票
1 回答
759 浏览

spring-kafka - 如何更改 [kafka-consumer-1] 这个线程名称?

我将并发设置为 10,我可以看到 10 个不同的线程 ID,但线程名称都是一样的。如何设置监听器名称?我试过 container.setBeanName 但没有运气。请帮忙。顺便说一句,我使用的是 1.1.2 版本

0 投票
1 回答
493 浏览

spring-kafka - 使用 ConcurrentKafkaListenerContainerFactory 时如何设置主题?

使用 concurrentKafkaListenerContainerFactory 时有什么方法可以设置主题吗?我根本不想做任何注释。

0 投票
1 回答
694 浏览

apache-kafka - 在 Spring-Kafka 中,哪个方法可以像 onPartitionsRevoked 一样做同样的事情?

我知道在 Sping-Kafka 我们有以下方法:

无效 registerSeekCallback(ConsumerSeekCallback 回调);

void onPartitionsAssigned(Map assignments, ConsumerSeekCallback 回调);

void onIdleContainer(Map assignments, ConsumerSeekCallback 回调);

但是哪一个和原生的 ConsumerRebalanceListener 方法 onPartitionsRevoked 一样呢?

“此方法将在重新平衡操作开始之前和消费者停止获取数据之后调用。建议在此回调中将偏移量提交给 Kafka 或自定义偏移量存储,以防止重复数据。”

如果我想实现 ConsumerRebalanceListener,如何传递 KafkaConsumer 引用?我只看到 Spring-Kafka 的消费者。

=========更新======

嗨,加里,当我将 RebalanceListener 添加到 ContainerProperties 中时。我可以看到这两种方法都被触发了。但是,我遇到了异常,说“提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll 长。 interval.ms,这通常意味着轮询循环花费了太多时间处理消息“你知道吗?

===========更新2 ============

0 投票
1 回答
1041 浏览

apache-kafka - kafka-consumer-groups.sh 等效?

我使用的是 0.10.2 版本。我想知道是否有任何 Kafka java API 可以获取偏移信息,例如使用:

因为我想在不使用 CLI 的情况下在我的应用程序中显示所有这些偏移信息。

0 投票
2 回答
1490 浏览

apache-kafka - Kafka消费者端故障处理和重新交付

如果某些消息未在 kafka 中返回,会发生什么情况。可以说我正在同时使用消息。并且一位消费者无法处理偏移量 = 20 的消息并且没有发回 Ack。但是偏移量=21 的其他消息已被消耗并返回。我怎样才能只重播20?

我是否需要将消息放入 DLQ 并再次消费?如果那里也发生故障怎么办?

我对保证交货有点困惑。

0 投票
2 回答
10663 浏览

spring-boot - Spring Boot - 在 application.properties 中获取 Spring-Kafka 客户端 ID 的主机名

我正在使用 Spring-Kafka 和 Boot 进行项目,并且希望在 application.properties 中获取属性 spring.kafka.consumer.client-Id 的主机名,以便可以在服务器端日志中区分我的每个消费者应该有问题。

有没有办法我可以做到这一点?我检查了 spring boot 参考指南和 java.lang.System 类,但找不到富有成效的指针。