问题标签 [embedded-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
422 浏览

kafka-consumer-api - 嵌入式 Kafka 未显示消费者偏移量

有一个嵌入式 kafka 实例作为测试的一部分运行。我正在尝试验证是否已阅读所有消息,但从 kafka 管理客户端得到一个空结果。

地图总是空的。我已经尝试设置 ack all 并设置 100ms autoOffsetCommit 等待看看这是否有任何区别,但没有运气。

0 投票
1 回答
157 浏览

spring-boot - 在流处理器消费者中模拟 SchemaRegistryClient

我有一个基于 Reactor 的 Spring Boot Kafka 流处理应用程序,我正在为其编写集成测试。我正在使用 Spring 的@EmbeddedKafka代理。它工作得很好,我让它覆盖了在我的反应式处理器的消费者和发布者上配置的引导代理 url,但我还没有弄清楚如何在测试时为我的处理器处理模式注册表。我正在使用 ConfluentKafkaAvroSerializerKafkaAvroDeserializer类,并且只schema.registry.url在我的 Spring 应用程序配置中配置了该字段以注入到 Kafka 属性中。我正在使用 Confluent 的MockSchemaRegistryClient对于测试生产者和消费者,但我需要一种将这个模拟客户端注入我的流处理器代码中的实际消费者和生产者的方法,但我认为没有办法做到这一点。几乎似乎我需要更像模式注册表的嵌入式版本来指向他们喜欢嵌入式代理。我们的构建管道不支持启动容器,否则我会使用 Docker 或 Testcontainers。其他人已经解决了这个问题吗?任何帮助或建议表示赞赏。

0 投票
1 回答
300 浏览

spring-kafka - 使用 EmbeddedKafka 进行单元测试

我正在尝试使用 Springboot 和 EmbeddedKafka 为单元测试设置一个类。我有两个主题,topicA 和 topicB,我将测试消息生成到 topicA 和 topicB。

所以这是我的课:

现在,如果我运行测试,produceIntoTopicB 测试会失败并出现以下错误:

而另一个测试失败并出现此错误:

我哪里错了?

0 投票
1 回答
322 浏览

spring-kafka - 如何测试 Spring Kafka Streams

我使用这种方法编写了这个流应用程序:一个StreamConfigs类:

和这个其他类

在类的kStream()方法中,MyStream有我想要测试的应用程序的流逻辑。

所以我写了这个类进行测试,使用嵌入式kafka

这是testStream()方法

这不起作用,因为我在这行代码上遇到错误

这是堆栈跟踪

如何获取StreamBuilder实例以便重用我kStream()的测试方法和嵌入式代理?如果我尝试使用 来创建StreamBuilder实例new,我的测试会停留在等待我的机器上的活动代理。

你能帮助我吗?

编辑:我认为这解决了问题:我以这种方式编辑类的setupKafka()方法MyStreamApplicationTest

0 投票
1 回答
318 浏览

java - NPE 使用嵌入式 Kafka 测试 Kafka Producer

我编写了一个基本的 spring boot 服务,它通过 rest API 使用一些数据并将其发布到 rabbitmq 和 kafka。

为了测试处理 kafka 生产的服务类,我遵循了这个指南:https ://www.baeldung.com/spring-boot-kafka-testing

单独来看,测试(KafkaMessagingServiceImplTest)在 intellij idea 和命令行上的 mvn 中都能完美运行。在想法中运行所有项目测试工作正常。但是,当我在命令行上通过 maven 运行所有项目测试时,该测试在尝试对有效负载字符串进行断言时失败并出现 NPE。

我已将根本问题的位置缩小到另一个测试类 (AppPropertiesTest),该测试类仅测试我的 AppProperties 组件(这是我用来以整洁的方式从 application.properties 中提取配置的组件)。当且仅当该测试类中的测试与使用项目根目录中的“mvn clean install”的失败测试一起运行时,NPE 才会出现。注释掉此类中的测试或使用 @DirtiesContext 对其进行注释可以解决问题。显然,此测试类加载到 spring 上下文中的某些内容会导致另一个测试中事件/倒计时的时间/顺序出现问题。当然,我不想使用@DirtiesContext,因为随着项目复杂性的增加,它会导致构建速度变慢。它也没有解释问题..我无法处理:)

AppPropertiesTest 使用构造函数注入来注入 AppProperties 组件。它还扩展了一个抽象类“GenericServiceTest”,其注释为:

并且不包含任何其他内容。您可能知道,SpringBootTest 注释在样板文件中构建了一个测试弹簧上下文和连线,以允许有效测试弹簧应用程序的依赖注入等,而 TestConstructor 注释允许在我的一些测试中注入构造函数。FWIW,我尝试删除 TestConstructor 注释并在 AppProperties 类中使用普通的旧自动装配来查看它是否有所作为,但它没有。

失败的测试类还扩展了 GenericServiceTest,因为它需要 spring 上下文来注入一些依赖项,例如消费者和正在测试的消息传递服务以及其中的 AppProperties 实例等。

所以我知道问题出在哪里,但我不知道问题是什么。即使 NPE 测试失败,根据 Baeldung 指南,我也可以在日志中看到消费者在失败前成功使用了消息:

然而,当我们回到断言时,payLoad 为空。我在失败的测试中尝试了各种类似 Thread.sleep() 的方法来给它更多时间,并且我增加了 await() 超时但没有任何乐趣。

我觉得奇怪的是,测试在 IDEA 中和孤立的情况下都很好。现在它开始让我有点发疯,我无法调试它,因为我的 IDE 中没有出现问题。

如果有人有任何想法,将不胜感激!

谢谢。

编辑:有人非常合理地建议我添加一些代码,所以这里是:)

失败的测试(在 assertTrue(payload.contains(testMessage)) 失败,因为 payLoad 为空)。自动装配的 kafkaMessagingService 只需注入 AppProperties 和 KakfaTemplate 的依赖项并调用 kafkaTemplate.send():

TestConsumer(用于在上面的测试中消费)

项目依赖:

AppPropertiesTest 类(其上下文似乎导致了问题)

AppPropertiesTest 类正在测试的 AppProperties 类:

两个测试都扩展的通用服务测试类。

失败(您可以在上面的行中看到有效负载已被接收并打印出来)。

0 投票
1 回答
611 浏览

spring-boot - Spring Cloud Stream 反序列化来自 Kafka 主题的无效 JSON

我正在努力将 Spring Cloud Streams 与 Kafka binder 集成。目标是我的应用程序使用主题中的 json 并将其反序列化为 Java 对象。我正在使用功能样式方法而不是命令式方法。我的代码使用结构良好的 json 输入。

另一方面,当我发送无效的 json 时,我希望触发错误记录方法。这在某些测试用例中有效,而在其他测试用例中无效。我的应用程序对 json 进行反序列化,即使它无效并触发包含逻辑的方法,而不是错误记录方法。

我无法解决为什么框架反序列化一些非结构化 json 输入的问题。

rejectCorruptedMessage测试方法中的json ,triggershandleError(ErrorMessage errorMessage)方法,这是预期的,因为它是无效的json。另一方面,rejectCorruptedMessage2测试方法中的 json 触发Consumer<KafkaEventRecord> consumer()了 TokenEventConsumer 类中的方法,这不是预期的行为,但是,我得到了具有空值的 KafkaEventRecord 对象。

0 投票
0 回答
64 浏览

spring - 侦听器在使用 EmbeddedKafka Spring 进行测试时不使用消息

我试图查看当消费者收到来自 kafka 主题的消息但测试没有通过并且消费者甚至没有收到消息时是否调用了服务。

我的测试:

主包中的消费者是具有@KafkaListener(topics = "topic") 的普通消费者。然后我有一个配置文件:

而且在 application.properties (在测试包内)我把这个:

春天:kafka:消费者:自动偏移重置:最早的组ID:组

0 投票
1 回答
270 浏览

integration-testing - spring-boot 嵌入式 kafka 问题:我收到无效接收(大小 = 369296129 大于 104857600)

  1. 我正在使用 spring boot、嵌入式 kafka 和 temporal 编写集成测试用例。我正在尝试发送有关 kafka 主题的消息。
  1. 但我得到了错误。
  1. 我还在 kafka.properties 中添加了以下配置,但我遇到了与上述相同的问题。

我是kafka的新手,请帮助我。

0 投票
0 回答
243 浏览

java - @Embedded Kafka - 使用构造函数创建主题与启用自动创建主题

嗨,我正在使用 @EmbeddedKafka 在 Spring Boot 中进行集成测试。使用以下两种方法创建主题时面临挑战。

  • 在@EmbeddedKafka 注解中使用主题属性
  • 使用auto.create.topic.enable=true

使用auto.create.topic.enable=true时,创建主题大约需要 1.5 分钟。

使用属性创建主题时,它工作正常

我想知道使用两种方法有什么区别,为什么自动创建方法需要很长时间????

0 投票
0 回答
186 浏览

spring-boot - EmbeddedKafka Spring 启动测试仅在 Github 操作上失败,但在本地失败

我正在使用带有 Kafka 和弹性的 Spring Boot 为 Groovy 创建一个演示应用程序。我在 Spock 测试中使用了 @EmbeddedKafka 注释,它们在本地工作得非常好;在 Windows 和 Ubuntu 上。它们通过运行或调试在 Intellij 中工作,没有问题。在我的外壳“./gradlew test”中尝试时也是如此。万事皆安。一旦我将它推送到 github.com,我的 github 操作就失败了。但它正在调用相同的命令。

动作定义:https ://github.com/besessener/GroovySpringBootKafkaElasticsearchDemo/blob/main/.github/workflows/test.yml

远程失败测试用例: https ://github.com/besessener/GroovySpringBootKafkaElasticsearchDemo/blob/main/src/test/groovy/me/spring/GroovyDemo/stream/KafkaSendAndReceiveTest.groovy

行动:https ://github.com/besessener/GroovySpringBootKafkaElasticsearchDemo/runs/3019862203?check_suite_focus=true

从操作输出来看,对我来说唯一看起来像错误的是:

我读了很多关于不使用静态端口进行 kafka 测试的内容。但这是我唯一的 kafka 测试,所以我不太明白应该如何发生冲突。此外,LEADER_NOT_AVAILABLE 可能是不存在主题的问题,或者消费者可能根本无法正确连接到代理。但我没有看到任何这些。

我仍然觉得它与 "localhost:9092" 作为 brokerProperties 更相关。使用 Github 操作时是否有这方面的问题?或者我还缺少什么?