问题标签 [spring-kafka-test]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring - Spring KafkaListener:如何知道它何时准备就绪
我有一个简单的 Spring Boot 应用程序,它从 Kafka 读取并写入 Kafka。我写了一个SpringBootTest
使用 anEmbeddedKafka
来测试所有这些。
主要问题是:有时测试失败是因为测试过早发送Kafka消息。这样,在 Spring 应用程序(或者KafkaListener
准确地说)准备好之前,消息就已经写入 Kafka。由于侦听器从latest
偏移量读取(我不想为我的测试更改任何配置 - 除了 bootstrap.servers),它不会收到该测试中的所有消息。
有谁知道我如何在测试中知道它KafkaListener
已准备好接收消息?
我能想到的唯一方法是等到/health
可用,但我不知道我是否可以确定这意味着KafkaListener
要准备好。
任何帮助是极大的赞赏!
此致。
spring - Spring Kafka @SendTo 不发送标头
我正在使用 向 Kafka 发送消息,ReplyingKafkaTemplate
并且它正在使用kafka_correlationId
. 但是,当它点击我的@KafkaListener
方法并将其转发到回复主题时,标题会丢失。
如何保留 kafka 标头?
这是我的方法签名:
我已经创建了一个ProducerInterceptor
,所以我可以看到从ReplyingKafkaTemplate
以及@SendTo
注释中发送了哪些标头。从那开始,另一个奇怪的事情是ReplyingKafkaTemplate
没有将记录的kafka_replyTopic
标题添加到消息中。
以下ReplyingKafkaTemplate
是配置的方式:
我不确定这是否相关,但我也添加了Spring Cloud Sleuth作为依赖项,并且当我发送消息时 span/trace 标头在那里,但是在转发消息时会生成新的标头。
unit-testing - 如何在 spring-boot 应用程序中使用 @KafkaListener 注释测试方法?
我有一个spring
带有@KafkaListener
方法的组件:
现在,我想测试这个方法。我想确保此方法正确接收消息。我尝试创建a Unit test
:
但我不明白接下来会发生什么。我该如何测试这种方法?
spring-kafka - 如何触发和处理这些 Spring Kafka 事件
在我的 Spring Boot 项目中,我有许多 Spring Kafka 消费者,我添加了一些事件侦听器来监控这些消费者的健康状况。这是代码:
有谁知道在什么情况下会抛出这些事件(也许我如何手动触发这些事件以进行测试)?对于最后三个事件(ConsumerStoppedEvent、ListenerContainerIdleEvent 和 NonResponsiveConsumerEvent),当我得到其中一个时,是否需要人工干预来解决问题(例如重新启动服务器以再次创建消费者)?谢谢!
spring-kafka - 如何针对在服务器上运行的真实 kafka 代理测试 kafka 消费者?
我很难理解 Java Spring Boot 中的一些 Kafka 概念。我想针对在服务器上运行的真实 Kafka 代理测试消费者,该服务器有一些生产者写入/已经将数据写入各种主题。我想与服务器建立连接,使用数据,并在测试中验证或处理其内容。
互联网上的绝大多数示例(实际上是到目前为止我所看到的所有示例)都涉及嵌入式 kafka、EmbeddedKafkaBroker,并显示了在本地一台机器上实现的生产者和消费者。我还没有找到任何可以解释如何与远程 kafka 服务器建立连接并从特定主题读取数据的示例。我已经编写了一些代码,并打印了经纪人地址:
我得到的是127.0.0.1:9092,表示是本地的,所以与远程服务器的连接还没有建立。
另一方面,当我运行 SpringBootApplication 时,我从远程代理获取有效负载。
接收者:
配置:
测试:
我希望有人对以下问题有所了解:
1.EmbeddedKafkaBroker 类的实例能否用于测试来自远程代理的数据,还是仅用于本地测试,我将在本地测试中生成数据,即将数据发送到我自己创建的主题并使用数据?
2.是否可以为真正的kafka服务器编写测试类?例如,验证是否已建立连接,或者是否已从特定主题读取数据。在这种情况下需要哪些注释、配置和类?
3.如果我只想消费数据,是否必须在配置文件中提供生产者配置(这会很奇怪,但到目前为止我遇到的所有示例都是这样做的)?
4.您是否知道任何资源(书籍、网站等)显示使用 kafka 的真实示例,即与远程 kafka 服务器、仅与生产者或消费者一起使用?
java - 如果应用程序包含 @KafkaListener 注释,则 SpringBootTest 失败
我用一个简单的测试创建了一个新的 spring boot 项目:
当我运行此测试时,它成功了。但是,如果我将任何方法注释@KafkaListener
注释添加到任何服务:
并运行测试,它有时会工作并引发异常:
java - 单元测试spring-kafka消费者时如何在KafkaEmbedded中设置端口
我在使用来自集群的消息的应用程序中使用spring-boot-starter-parent
version 1.5.0.RELEASE
、spring-kafka
version1.0.0.RELEASE
和spring-kafka-test
version 。我为我的消费者使用了一个单元测试,但由于代理端口是随机选择的,所以它失败了。有没有办法可以在不更改版本的情况下设置此代理属性?或者我应该使用哪些版本以免破坏任何东西?1.0.0.RELEASE
Kakfa 0.9
KafkaEmbedded
这是KafkaListener
and的代码KafkaConsumerTest
。
监听器.java
KafkaConsumerTest.java(编辑)
java - Spring-kafka-test 使用自定义反序列化测试 JSON 消息
我已经成功地使用 EmbeddedKafka 设置了一个测试,它产生和使用一条消息。以下是测试的工作版本:
这很好用,除非我在to
字段上添加 assertEquals,它在 JSON 中是一个电子邮件地址列表,由分隔,;
并且在类中是一个字符串数组。因此,在EmailMessage
课堂上,我的to
字段注释为@JsonDeserializer
指向拆分的自定义反序列化器;
等。运行应用程序时一切正常,只有测试中断。
我尝试修改上面的代码以更改Producer<String, KafkaEmailMessageWrapper> producer
为private Producer<String, String> producer
,然后在测试中发送 JSON 而不是KafkaEmailMessageWrapper
实例,但随后出现异常:
所以看起来从 JSON 字符串到模型的反序列化由于某种原因在这个测试场景中没有发生。我希望测试尽可能接近真实用例,所以它应该产生一个字符串消息,然后对我的模型类执行反序列化。不知道为什么这没有发生,任何有助于理解为什么会这样的帮助将不胜感激!
为了完成,这是消息侦听器的定义:
编辑
让我重新表述这个问题,因为也许我有点混淆了。该应用程序的概述是:
- 使用 JSON 消息
- 将其序列化为
KafkaEmailMessageWrapper
- 发送电子邮件
这行得通,我向 Kafka 发送了一个 JSON,它被接收(上面的侦听器定义)并正确进行。我唯一的问题是在测试时。
我想在测试中模仿相同的情况,但是当我这样做时:
kafkaTemplate.sendDefault("key", "<json message>");
我得到一个:
但是,当我改为发送KafkaEmailMessageWrapper
实例时:
kafkaTemplate.sendDefault("key", kafkaEmailMessageWrapper);
它可以工作,但是它不使用我@JsonDeserialize
在课堂上指定的注释:
所以在发送实例时,to和cc字段总是空的,因为字段中已经是字符串数组,所以拆分的反序列化逻辑;
没有做任何事情。这是反序列化方法:
spring-kafka - SASL_SSL 与 EmbeddedKafka 的集成
我一直在关注这篇博文以实现嵌入式 sasl_ssl https://sharebigdata.wordpress.com/2018/01/21/implementing-sasl-plain/
当我使用 PLAINTEXT://localhost:9093 配置时,我得到以下信息:
WARN org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Connection to node 0 terminated during authentication. This may indicate that authentication failed due to invalid credentials.
但是,当我删除它时,我得到org.apache.kafka.common.KafkaException: Tried to check server's port before server was started or checked for port of non-existing protocol
我尝试更改 SecurityProtocol 类型以自动发现它应该使用哪种类型的代理通信(它被硬编码为纯文本 - 这可能应该得到修复):
我仍然收到以下错误:WARN org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Connection to node 0 terminated during authentication. This may indicate that authentication failed due to invalid credentials.
有没有办法正确配置嵌入式 kafka 以启用 sasl_ssl?
spring-kafka - IntegrationTests 的正确@EmbeddedKafka 用法是什么?
我已经阅读了 spring-kafka 文档、我找到的示例和一半的 stackoverflow,但我不明白 @EmbeddedKafka 应该由谁工作。特别是对于集成测试。
我必须在测试 application.properties 中添加以下内容:
有了这一切,当测试它执行时,它抱怨几行关于 kafka 不可访问:
但是在屏幕的一半之后,测试成功并完成了最后一条令人不快的消息:
问题:1/为什么在每个方法开始时都会有关于代理不可用的消息,即使上下文没有脏并重新初始化?2/为什么会出现带有未关闭线程的丑陋消息?怎么了?