问题标签 [spring-kafka-test]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1713 浏览

spring - Spring KafkaListener:如何知道它何时准备就绪

我有一个简单的 Spring Boot 应用程序,它从 Kafka 读取并写入 Kafka。我写了一个SpringBootTest使用 anEmbeddedKafka来测试所有这些。

主要问题是:有时测试失败是因为测试过早发送Kafka消息。这样,在 Spring 应用程序(或者KafkaListener准确地说)准备好之前,消息就已经写入 Kafka。由于侦听器从latest偏移量读取(我不想为我的测试更改任何配置 - 除了 bootstrap.servers),它不会收到该测试中的所有消息。

有谁知道我如何在测试中知道它KafkaListener已准备好接收消息?

我能想到的唯一方法是等到/health可用,但我不知道我是否可以确定这意味着KafkaListener要准备好。

任何帮助是极大的赞赏!

此致。

0 投票
1 回答
2439 浏览

spring - Spring Kafka @SendTo 不发送标头

我正在使用 向 Kafka 发送消息,ReplyingKafkaTemplate并且它正在使用kafka_correlationId. 但是,当它点击我的@KafkaListener方法并将其转发到回复主题时,标题会丢失。

如何保留 kafka 标头?

这是我的方法签名:

我已经创建了一个ProducerInterceptor,所以我可以看到从ReplyingKafkaTemplate以及@SendTo注释中发送了哪些标头。从那开始,另一个奇怪的事情是ReplyingKafkaTemplate没有将记录的kafka_replyTopic标题添加到消息中。

以下ReplyingKafkaTemplate是配置的方式:

我不确定这是否相关,但我也添加了Spring Cloud Sleuth作为依赖项,并且当我发送消息时 span/trace 标头在那里,但是在转发消息时会生成新的标头。

0 投票
1 回答
2517 浏览

unit-testing - 如何在 spring-boot 应用程序中使用 @KafkaListener 注释测试方法?

我有一个spring带有@KafkaListener方法的组件:

现在,我想测试这个方法。我想确保此方法正确接收消息。我尝试创建a Unit test

但我不明白接下来会发生什么。我该如何测试这种方法?

0 投票
1 回答
1661 浏览

spring-kafka - 如何触发和处理这些 Spring Kafka 事件

在我的 Spring Boot 项目中,我有许多 Spring Kafka 消费者,我添加了一些事件侦听器来监控这些消费者的健康状况。这是代码:

有谁知道在什么情况下会抛出这些事件(也许我如何手动触发这些事件以进行测试)?对于最后三个事件(ConsumerStoppedEvent、ListenerContainerIdleEvent 和 NonResponsiveConsumerEvent),当我得到其中一个时,是否需要人工干预来解决问题(例如重新启动服务器以再次创建消费者)?谢谢!

0 投票
2 回答
1784 浏览

spring-kafka - 如何针对在服务器上运行的真实 kafka 代理测试 kafka 消费者?

我很难理解 Java Spring Boot 中的一些 Kafka 概念。我想针对在服务器上运行的真实 Kafka 代理测试消费者,该服务器有一些生产者写入/已经将数据写入各种主题。我想与服务器建立连接,使用数据,并在测试中验证或处理其内容。

互联网上的绝大多数示例(实际上是到目前为止我所看到的所有示例)都涉及嵌入式 kafka、EmbeddedKafkaBroker,并显示了在本地一台机器上实现的生产者和消费者。我还没有找到任何可以解释如何与远程 kafka 服务器建立连接并从特定主题读取数据的示例。我已经编写了一些代码,并打印了经纪人地址:

我得到的是127.0.0.1:9092,表示是本地的,所以与远程服务器的连接还没有建立。

另一方面,当我运行 SpringBootApplication 时,我从远程代理获取有效负载。

接收者:

配置:

测试:

我希望有人对以下问题有所了解:

1.EmbeddedKafkaBroker 类的实例能否用于测试来自远程代理的数据,还是仅用于本地测试,我将在本地测试中生成数据,即将数据发送到我自己创建的主题并使用数据?

2.是否可以为真正的kafka服务器编写测试类?例如,验证是否已建立连接,或者是否已从特定主题读取数据。在这种情况下需要哪些注释、配置和类?

3.如果我只想消费数据,是否必须在配置文件中提供生产者配置(这会很奇怪,但到目前为止我遇到的所有示例都是这样做的)?

4.您是否知道任何资源(书籍、网站等)显示使用 kafka 的真实示例,即与远程 kafka 服务器、仅与生产者或消费者一起使用?

0 投票
1 回答
514 浏览

java - 如果应用程序包含 @KafkaListener 注释,则 SpringBootTest 失败

我用一个简单的测试创建了一个新的 spring boot 项目:

当我运行此测试时,它成功了。但是,如果我将任何方法注释@KafkaListener注释添加到任何服务:

并运行测试,它有时会工作并引发异常:

0 投票
2 回答
5829 浏览

java - 单元测试spring-kafka消费者时如何在KafkaEmbedded中设置端口

我在使用来自集群的消息的应用程序中使用spring-boot-starter-parentversion 1.5.0.RELEASEspring-kafkaversion1.0.0.RELEASEspring-kafka-testversion 。我为我的消费者使用了一个单元测试,但由于代理端口是随机选择的,所以它失败了。有没有办法可以在不更改版本的情况下设置此代理属性?或者我应该使用哪些版本以免破坏任何东西?1.0.0.RELEASEKakfa 0.9KafkaEmbedded

这是KafkaListenerand的代码KafkaConsumerTest

监听器.java

KafkaConsumerTest.java编辑

0 投票
1 回答
1699 浏览

java - Spring-kafka-test 使用自定义反序列化测试 JSON 消息

我已经成功地使用 EmbeddedKafka 设置了一个测试,它产生和使用一条消息。以下是测试的工作版本:

这很好用,除非我在to字段上添加 assertEquals,它在 JSON 中是一个电子邮件地址列表,由分隔,;并且在类中是一个字符串数组。因此,在EmailMessage课堂上,我的to字段注释为@JsonDeserializer指向拆分的自定义反序列化器;等。运行应用程序时一切正常,只有测试中断。

我尝试修改上面的代码以更改Producer<String, KafkaEmailMessageWrapper> producerprivate Producer<String, String> producer,然后在测试中发送 JSON 而不是KafkaEmailMessageWrapper实例,但随后出现异常:

所以看起来从 JSON 字符串到模型的反序列化由于某种原因在这个测试场景中没有发生。我希望测试尽可能接近真实用例,所以它应该产生一个字符串消息,然后对我的模型类执行反序列化。不知道为什么这没有发生,任何有助于理解为什么会这样的帮助将不胜感激!

为了完成,这是消息侦听器的定义:

编辑

让我重新表述这个问题,因为也许我有点混淆了。该应用程序的概述是:

  • 使用 JSON 消息
  • 将其序列化为KafkaEmailMessageWrapper
  • 发送电子邮件

这行得通,我向 Kafka 发送了一个 JSON,它被接收(上面的侦听器定义)并正确进行。我唯一的问题是在测试时。

我想在测试中模仿相同的情况,但是当我这样做时:

kafkaTemplate.sendDefault("key", "<json message>");

我得到一个:

但是,当我改为发送KafkaEmailMessageWrapper实例时:

kafkaTemplate.sendDefault("key", kafkaEmailMessageWrapper);

它可以工作,但是它不使用我@JsonDeserialize在课堂上指定的注释:

所以在发送实例时,to和cc字段总是空的,因为字段中已经是字符串数组,所以拆分的反序列化逻辑;没有做任何事情。这是反序列化方法:

0 投票
0 回答
2888 浏览

spring-kafka - SASL_SSL 与 EmbeddedKafka 的集成

我一直在关注这篇博文以实现嵌入式 sasl_ssl https://sharebigdata.wordpress.com/2018/01/21/implementing-sasl-plain/

当我使用 PLAINTEXT://localhost:9093 配置时,我得到以下信息: WARN org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Connection to node 0 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

但是,当我删除它时,我得到org.apache.kafka.common.KafkaException: Tried to check server's port before server was started or checked for port of non-existing protocol

我尝试更改 SecurityProtocol 类型以自动发现它应该使用哪种类型的代理通信(它被硬编码为纯文本 - 这可能应该得到修复):

我仍然收到以下错误:WARN org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Connection to node 0 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

有没有办法正确配置嵌入式 kafka 以启用 sasl_ssl?

0 投票
0 回答
268 浏览

spring-kafka - IntegrationTests 的正确@EmbeddedKafka 用法是什么?

我已经阅读了 spring-kafka 文档、我找到的示例和一半的 stackoverflow,但我不明白 @EmbeddedKafka 应该由谁工作。特别是对于集成测试。

我必须在测试 application.properties 中添加以下内容:

有了这一切,当测试它执行时,它抱怨几行关于 kafka 不可访问:

但是在屏幕的一半之后,测试成功并完成了最后一条令人不快的消息:

问题:1/为什么在每个方法开始时都会有关于代理不可用的消息,即使上下文没有脏并重新初始化?2/为什么会出现带有未关闭线程的丑陋消息?怎么了?