问题标签 [spring-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 如何使用 xml 配置升级到 spring-integration-kafka 2.1.0.RELEASE?
我正在将 spring-integration-kafka 从 1.0.0.M2 升级到 2.1.0.RELEASE 和客户端 0.9.0 到 0.10.0
当前的xml配置如下
如何将其更改为 2.1.0.RELEASE ?
~~~~~~~~~~~~~~~
在这里编辑:
使用参考根据我的要求修改了 xml。我在阅读消费者记录时遇到了小问题。我得到的有效载荷如下
我需要值(NotificationVo)以供消费者进一步使用。如何将其作为有效载荷的一部分?
~~~~~~~~~~~~~~~
在这里编辑:
这是修改后的 xml 配置文件
~~~~~~~~~~~~~~~
在这里编辑:
消费类:
输出/输出:
~~~~~~~~~~~~~~~
在这里编辑:
更改 Consumer 类中的方法参数后收到预期的有效负载。
输出/输出:
终于按预期工作了。
非常感谢您的支持。
java - 如何在 spring-integration-kafka 2.1.0.RELEASE 和 Kafka 0.10.0 中为不同主题配置不同的 Producer?
在将 Kafka 从 0.9.0 升级到 0.10.0 时,在将不同的生产者配置到不同的主题时遇到问题。下面给出的 XML 配置
分别发布 2 个主题时出错。堆栈跟踪如下
定义了两个单独的 Serializer 和 Deserializer 类。但是它如何在内部引用其他类?我错过了任何配置吗?
spring-boot - spring-kafka - 如何从头开始阅读一个主题,同时从头开始阅读另一个主题?
我正在编写一个 spring-kafka 应用程序,其中我需要阅读 2 个主题:test1 和 test2:
我的配置如下所示:
我需要能够从“test1”中读取最新消息,同时能够从“test2”一开始就读取所有消息。我只对应用程序启动时的“test2”消息感兴趣,但只要应用程序正在运行,就需要不断地阅读“test1”消息。
有没有办法配置这样的功能?
java - Spring Kafka JsonSerializer 使用
我正在尝试按照此处的说明进行操作:
设置一个可以序列化和发送我拥有的一些简单 Java POJO 的 KafkaTemplate。但我发现文档含糊不清,尤其是这部分:
为此,Spring for Apache Kafka 还提供了基于 Jackson JSON 处理器的 JsonSerializer/JsonDeserializer 实现。当 JsonSerializer 非常简单并且只允许将任何 Java 对象编写为 JSON 字节 []
...
尽管从低级 Kafka Consumer 和 Producer 的角度来看,Serializer/Deserializer API 非常简单和灵活,但在存在 KafkaTemplate 和 @KafkaListener 的消息传递级别上还不够。
...
MessageConverter 可以直接注入 KafkaTemplate 实例,也可以通过 @KafkaListener.containerFactory() 属性的 AbstractKafkaListenerContainerFactory bean 定义注入
所以我的问题是:
- 我的 KafkaTemplate 是什么类型的?是
KafkaTemplate<String, Object>
吗?或者是KafkaTemplate<String, String>
吗? - 我的序列化程序类是什么?是
StringSerializer
,还是JsonSerializer
? - 我
kafkaTemplate.setMessageConverter(new StringJsonMessageConverter())
在创建 KafkaTemplate bean 时使用吗?
如果这些是愚蠢的问题,我深表歉意 - 我试图理解设置它的正确方法,而不是“破解它直到它有点工作”。
java - spring-kafka: how to pass a method to another method, which (by design) never returns?
totally new to java.
writing a spring-kafka app, which has the following Kafka listener method:
I've been asked to refactor this part of the app into a separate stand-alone package, so the "receiveMessage" could be called, and a method / function could be passed to it, in place of "handleMessage", to process each incoming message.
This "receiveMessage" method never returns, as it keeps listening on a Kafka topic.
What would be the proper syntax, to change / add in this method, so it could be called as a package / library, and a message handling method would be passed to it by the calling app:
apache-kafka - Spring-Kafka 批次不更新较大批次的偏移量
我已经使用 Spring Kafka 文档中的批处理模式在 Spring Boot App 中设置了 Kafka Consumer。在我们对主题进行加载之前,它似乎一直有效。然后偏移量没有更新,它会一遍又一遍地处理同一批消费者消息。
默认情况下,根据框架中的某些算法,批量大小似乎是动态的。因此,随着主题越来越多,它从成百上千条消息批量变成了数千条消息。我可以通过将批处理大小(max.poll.records)设置为 25 来解决这个问题,但在我了解根本原因之前,我并不真正信任 Kafka/Spring-Kafka。
批处理需要 3000 毫秒到 15000 毫秒才能完成。
工厂属性:
工厂建设:
侦听器服务使用注解:
spring - 在 Spring 中重新使用 Kafka 0.10.1 主题
在 Spring-Kafka 中,我想从头开始重新使用 Kafka 主题。通过将 更改group.id
为 Kafka 未知的内容来做到这一点当然可以:
但是,通过将偏移量设置为 0 来重新开始会失败。
我得到的错误:
有谁熟悉这个?
我正在使用Kafka 0.10.1.0
和
java - 在多集群环境中多次生成/使用相同的消息
请在下面找到两台服务器中 kafka 消费者和 kafka 生产者的给定配置
消费者(C):
生产者(P):
我在服务器 1 和服务器 2 中部署相同的 kafka 消费者 (C) 和 kafka 生产者 (P),连接到同一个 kafka 代理。
每当我在服务器 1 中生成一条消息时,它就会被消耗四次(获得 4 条消息)。我的要求是生成的消息应该只被消费一次。
这里发生的是:比方说,通过调用 applicaton.send(message); 发送的消息;
因此,两个 P 都充当生产者,并且每个人都将其传递给自己的消费者和其他消费者,所以,
因此 1 条消息产生 2 次并消耗 4 次。
我想知道,有什么方法可以限制它,一旦消息被生产/消费,如果它第二次发送给同一个生产者/消费者,我们可以唯一地识别它并避免在上述配置中生产/消费它。
java - 是否有对 kafka 0.8.2.2 的弹簧支持?
我想使用spring支持与kafka集成。服务器版本是 0.8.2.2,但我能找到的所有 spring 项目都使用更新的 kafka。甚至 spring-kafka-1.0.0 也使用 kafka 0.9.0.1。我读到 kafka 客户端不向后兼容。所以问题是:是否有对旧版 kafka 的 Spring 支持,或者我必须获得官方的 kafka 客户端并自己做?
spring-boot - 由 Spring Boot Actuator 支持的 Kafka MetricsReporter
我决定回到我将 Kafka Metrics 与 Spring Boot Actuator 集成的想法,我在这里提到过:
https://github.com/spring-projects/spring-boot/issues/6227
到目前为止,我有一个带有工作代码的单独“沙盒”项目,我想将其合并到 Spring Boot 中。现在我有点困惑。我的部分测试需要 powermock 来模拟 Kafka 的“超级安全”类:
但在 Spring Boot 中不使用 powermock。
我应该怎么办?
- 添加最新的稳定 powermock - 在 spring-boot-dependencies 和 spring-boot-actuator 中。
- 仅将最新的稳定 powermock 添加到 spring-boot-actuator - 让它成为一个隐藏的秘密助手。
- 排除需要 powermock 的测试。
- 忘记 Kafka Metrics,它们又大又可怕,没有人希望它们出现在我们漂亮友好的 Spring Boot Actuator 中。