问题标签 [spring-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 如何确定 Kafka 消费者的偏移量
我们有一个问题,似乎 Kafka 消费者没有收到发布到主题的消息。(我说似乎我还没有深究,我可能是错的。)
我正在为 Apache Kafka 使用 Spring,而我的使用者实际上是一个用@KafkaListener
.
这个问题是间歇性的,我无法重新创建它。
有没有办法让我查看 Kafka 代理的日志,或任何其他工具来帮助我找出我的消费者的偏移量?我想要具体的证据证明我的消费者是否收到了消息。
avro - 如何使用 Spring Boot 和 Spring Cloud Stream 在 kafka 生产者上设置 avro 内容类型
我正在尝试使用 spring boot、spring 云流模式注册表、kafka binder 与 avro 消息生产者、kafka 和 avro 消费者构建一个工作示例。消费消息时出现以下错误:
错误 7059 --- [afka-listener-1] oscsbkKafkaMessageChannelBinder:无法使用 contentType [application/x-java-object;type=io.igx.android.Sensor] java 反序列化 [io.igx.android.Sensor]。 lang.NullPointerException
我在消费者端有 spring.cloud.stream.bindings.output.contentType=application/ +avro 在生产者端有 spring.cloud.stream.bindings.output.contentType=application/ +avro。
为什么消费者没有看到和使用 avro 内容类型而不是 application/x-java-object。
spring-kafka - 每个主题都需要一个 KafkaTemplate 吗?
看起来可以使用泛型配置 KakfkaTemplate 来指示键和值。密钥类型是否特定于用例?例如一些识别信息,如 personId、patientId,该值将呈现某种 pojo(DTO,可能不是整个 JPA 实体)
这就引出了一个问题,我们是否应该根据用例(针对每个主题)配置多个 KafkaTemplate?
spring-kafka - 尽管返回 ListenableFuture,但 KafkaTemplate 会阻塞
我正在使用 spring-kafka 1.1.2。每当我调用返回 ListenableFuture 的 KafkaTemplate 的 send 方法时,它都会阻塞。
查看 KafkaProducer 的源代码,它实际上似乎是这样打算的——我发现对名为“waitOnMetadata”的方法的调用。
我错过了一些明显的东西吗?向 Kafka 发送记录时如何避免阻塞?
spring-cloud - 使用新版本 Chelsea.RC1 的“条件参数标头”@StreamListener 时出错
我正在尝试使用事件过滤器来减少应用程序使用的主题数量,使用新版本的 spring 云流 (Chelsea.RC1) 中提供的新功能。正在创建具有正确标头的消息,但是,检查队列中消息的内容时,该消息不包含标头,仅包含带有有效负载的正文。
消费者
在消费者服务中,它给出以下警告:
spring - 嗨,spring-kafka 如何处理消费者线程?
有人可以告诉我 spring-Kafka 是否具有 spring-JMS 之类的功能,可以根据负载动态启动线程或减少线程?
通过使用 Kafka,我们是否需要担心这些线程管理的东西?我知道 Kafka 消费者的最佳实践是拥有与您在该主题上拥有的分区数量相同数量的线程。
spring-cloud-stream - 发布带有原始标头的 null/tombstone 消息
我正在构建一个 Spring Cloud Stream Kafka 处理器应用程序,该应用程序将使用带有字符串键的原始数据,有时还会使用来自 Kafka 主题的空负载。我想为另一个主题生成一个字符串键和空有效负载(在 Kafka 中称为墓碑)。为了在消息上使用原始标头,我需要输出 a byte[]
,但如果我编码KafkaNull.INSTANCE
为 a byte[]
,它将逐字输出对象哈希码的字符串。
如果我尝试发送除 a 以外的任何内容byte[]
,则无法使用原始标头。
这样做的正确方法是什么?即使有效负载为空,标头也会在哪里?我设置producer.headerMode=embeddedHeaders
了部署,这似乎没有什么区别,仍然有哈希码作为有效负载。
spring - Spring Boot Kafka:由于 NoSuchBeanDefinitionException 而无法启动消费者
在我的 Spring Boot 服务中,尝试启动一个看到 NoSuchBeanDefinitionException 并且无法启动服务本身的 kafka 消费者。
下面是我的 bean 类,其中包含为 Kafka 配置创建的所有必需的 bean
Spring Boot 版本:1.5.2.RELEASE
这是异常跟踪:
apache-kafka - Spring kafka消费者,在运行时寻求偏移量?
我正在使用 KafkaMessageListenerContainer 从 kafka 主题中消费,我有一个应用程序逻辑来处理每个记录,这也依赖于其他微服务。我现在在处理每条记录后手动提交偏移量。
但是,如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它,直到它成功。为此,我需要对最后一个偏移量进行运行时手动查找。
KafkaMessageListenerContainer 是否可以做到这一点?
java - 使用 @KafkaListener 处理错误
我正在使用spring-kafka
以下配置:
我的听众看起来像这样:
到目前为止,我一直在查看 的测试和参考文档spring-kafka
,文档说应该ErrorHandler
配置适当的类型,这个测试暗示我应该配置它ContainerProperties
,尽管这只是一个错误处理程序,在我的使用中案例,我想定义多个(针对不同的有效负载类型),这可能吗,如果是的话,如何?
此外,有没有办法描述在带注释的侦听器 void 上使用哪个错误处理程序?
另外,有没有办法描述RecoveryCallback
每个@KafkaListener
或每个不同的主题,或者必须有不同ListenerContainerFactory
的 s?
我可能完全弄错了,有人能指出我正确的方向吗?我如何ErrorHandler
以正确的方式为不同的有效负载类型配置多个 s?