问题标签 [spring-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
9140 浏览

spring - 重新平衡后,kafka 停止使用来自新分配的分区的消息

我对 kafka 很陌生(对英语也很陌生......),我面临这个问题并且无法谷歌任何解决方案。

我使用spring-boot,spring-kafka支持,我在本地机器上安装了kafka_2.11-0.10.1.1(只有一个broker 0)

s1.then 我创建主题

我的消费者配置:applitions.properties:

s2。然后我通过更改“kafka.client.id”启动 3 个消费者并运行 spring-boot main class 。在 Eclipse 控制台上,我可以检查分区分配:

s3。启动 pruducer 向主题发送 20 条消息,每条消息开始消费特定分区的消息

s4。我关闭消费1,kafka自动重新平衡,新分区分配:

s5。我发现分区'tracking-3'上的消息没有被消耗!!

每次都可以重现问题,消除新分配分区中的一些消息丢失,您有什么建议吗?请帮助我,谢谢

0 投票
2 回答
15753 浏览

jackson - 无效的 UTF-8 中间字节 0x72

我在 spring-kafka 中使用 JsonSerializer 和 JsonDeserializer 在生成消息时设置值序列化器。该消息有一个字段(orgName),其中包含一个特殊字符(德语变音符号)。如何处理这个特殊字符?我知道 JsonDeserializer 使用杰克逊,杰克逊支持 utf-8。JsonDeserializer 会因此引发此错误:

0 投票
2 回答
2486 浏览

spring - Spring Kafka 消费者:有没有办法使用 Kafka 0.8 从多个分区中读取?

这是场景:我知道使用与 Spring kafka 相关的最新 API(如 Spring-integration-kafka 2.10)我们可以执行以下操作:

并从与同一 kafka 主题相关的不同分区中读取。

我想知道我们是否可以使用例如 spring-integration-kafka 1.3.1 来做同样的事情

我没有找到任何关于如何做到这一点的提示(我对 xml 版本很感兴趣)。

0 投票
1 回答
312 浏览

spring-integration - 将 Spring kafka 1.1.2 与 Spring 框架 3.1.1 一起使用

我对弹簧集成和弹簧卡夫卡相当陌生。我必须为 kakfa 编写生产者,并计划使用 spring kafka 1.1.2(正如我之前使用过的那样)并且父项目正在使用 spring framework 3.1.1 我想了解如何编写 xml 配置对于 spring kafka 1.1.2,因为我在文档中找不到任何内容。

另外,我阅读了一些关于 spring 集成 kafka 模块的信息,但不太确定是否应该使用该模块。

0 投票
1 回答
1365 浏览

java - 无法创建 Kafka 消费者

我正在使用 spring-kafka,并且正在尝试创建一个 Kafka 消费者。我参考了http://howtoprogram.xyz/2016/09/23/spring-kafka-tutorial/https://www.codenotfound.com/2016/09/spring-kafka-consumer-producer-example.html . 我目前使用的是参考文献中完全相同的代码。

这是接收器类

这是 KafkaConsumerConfig

只要我用@EnableKafka 注释我的类,它有@KafkaListener 方法。我收到以下错误。我很难理解我是否在某个地方犯了错误。

0 投票
1 回答
7267 浏览

apache-kafka - Kafka 自定义反序列化器转换为 Java 对象

我正在使用 Spring Kafka 集成,并且我有自己的值通用序列化器/反序列化器,如下所示

序列化器:

解串器:

序列化程序运行良好,但是在消费消息时对值进行反序列化时,我得到的是一个LinkedHashMap而不是期望的对象,请告诉我我在哪里弄错了,在此先感谢。

0 投票
1 回答
6514 浏览

java - spring kafka thorws InstanceAlreadyExistsException 设置并发> 1后异常

我正在使用spring-kafka,如果我不设置ConcurrentKafkaListenerContainerFactory的并发,一切正常,当我将它设置为大于1的数字时,我得到一个异常:

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=client-3

我的配置:

特性:

0 投票
3 回答
34800 浏览

apache-kafka - 发送到 kafka 主题时序列化消息时出错

我需要测试一条包含标题的消息,所以我需要使用 MessageBuilder,但我无法序列化。

我尝试在生产者道具上添加序列化设置,但它不起作用。

有人能帮我吗?

这个错误:

我的测试课:

}

0 投票
2 回答
440 浏览

spring - Spring Kafka 1.2.0.Relase.Jar、Spring-boot-starter 1.5.3 和 spring-messaging 4.2.0

我正在尝试运行 spring-kafka 集成。当我尝试启动 Spring Boot 时,我收到以下错误消息。 https://github.com/spring-projects/spring-kafka/issues/172 根据上面的链接,4.3 兼容 Kafka 1.1 及以上版本。有没有人尝试过这种组合。请记住,如果您使用 spring-messaging(4.3 版)代码将无法编译并且 Eclipse 会抱怨 maven 配置是

0 投票
2 回答
5265 浏览

spring-boot - Spring boot:不包括一些自动配置的 bean

我有一个使用 spring-kafka 的 Spring Boot 项目。在这个项目中,我构建了一些包装 spring-kafka bean 的事件驱动组件(即 KafkaTemplate 和 ConcurrentKafkaListenerContainer)。我想让这个项目成为跨一组 Spring Boot 应用程序的可重用库。但是,当我从 Spring Boot 应用程序向该库添加依赖项时,在应用程序启动时出现错误:

因为我需要自动装配 a ConsumerFactory<A, B>(而不是 a ConsumerFactory<Object, Object>),所以我在一个使用 @EnableConfigurationProperties(KafkaProperties.class) 注释的配置类中创建了这个 bean

我需要的只是重用 org.springframework.boot.autoconfigure.kafka.KafkaProperties,而无需在 KafkaAutoConfiguration 和 KafkaAnnotationDrivenConfiguration 中自动配置其他 bean。

我尝试将@EnableAutoConfiguration(exclude = KafkaAutoConfiguration.class)放入我的库中,但这并不能阻止依赖于我的库的应用程序触发库中排除的 spring-kafka 自动配置。

如何指定我不希望在我的库中以及在依赖此库的任何应用程序中自动配置某些 bean(KafkaAutoConfiguration 和 KafkaAnnotationDrivenConfiguration) ?