问题标签 [embedded-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
maven - 使用 EmbeddedKafka 进行测试 - 测试报告
我正在使用EmbeddedKafka
在我的微服务中测试与 Kafka 的集成。而且,我正在使用 jacoco 发布测试结果。为了使测试运行没有任何问题,我必须将forkCount
参数设置为0
.
如果没有,它会失败并出现以下错误:
MojoFailureException:启动fork时出错,请检查日志中的输出
问题是因为我将 forkCount 设置为0
,所以 jacoco 插件无法生成报告,因为它无法以javaagent
. 有解决方法吗?
spring-boot - 如何在 SpringBoot Application 中添加嵌入式 kafka
我在我的项目中使用 Kafka 和 Springboot,现在我有外部 Kafka 集群和 SpringBoot 微服务项目。
使用 yml 文件,我成功创建了 Kafka 生产者和消费者,然后应用程序和 kafka 都可以正常通信。
现在我想在我的 sprintboot 项目中使用嵌入式 kafka。类似于在 Activemq 代码之后嵌入的东西。
我用谷歌搜索并阅读了很多相同的文章,但无法获得清晰的图片。在这里,我不希望这个嵌入式服务器仅用于 junit 测试,而是
希望在我的生态系统中不设置任何外部 Kafka 组件的情况下进行功能测试。
我真的不知道该怎么做,有人可以帮助我吗?
提前致谢
spring-boot-test - 如何修复无法找到 meta.properties 的嵌入式 kafka 的错误
我正在尝试对使用 kafka、kafka-streams 和 cassandra 的应用程序进行集成测试。但是当我尝试设置测试类时,我有 2 个错误: 错误 [main] BrokerMetadataCheckpoint: 无法读取 dir 下的 meta.properties 文件 错误 [main] KafkaServer: 无法读取日志目录下的 meta.properties
我正在使用 spring-boot-starter 2.1.2、spring-boot-starter-test 2.1.2、spring-kafka 2.2.0、spring-kafka-test 2.2.0、apache.kafka-streams 2.1.0
试图更改 logs.dir 和 logs.dirs 参数。使用@EnableKafka @EnableKafkaStreams
我希望使用嵌入式 kafka 运行上下文,但现在我收到一个错误,即 meta.properties 不存在
spring-boot - 在 Spring Boot Junit 中推入 @EmbeddedKafka 后从主题中获取数据
我正在为我的 Spring Boot 应用程序编写 Junit 测试用例(使用 @EmbeddedKafka),该应用程序广泛使用 Spring-kafka 与其他服务和其他操作进行通信。
一个典型的案例是从 kafka 中删除数据(我们在 kafka 中推送空消息)。
目前在 delete() 方法中,我们首先检查 kafka 中是否存在任何请求删除的消息。然后我们在 Kafka 中为该消息键推送null
为上述方法逻辑编写 Junit 的步骤。
这里的问题是 Kafka 总是抛出 message not found 异常。在 service.delete() 方法中。
在控制台中检查日志时。我发现我的生产者配置为 kafka 使用不同的端口,而消费者配置使用不同的端口。
我不确定我是否遗漏了一些细节,或者这种行为的原因是什么。任何帮助将不胜感激。
java - Apache kafka 嵌入式 kafka junit 测试 - 当我运行单元测试时应用程序启动
我正在使用 kafka 在 Spring Boot 中开发异步邮件服务器。
我已经用嵌入式 kafka 编写了测试,它在随机端口中启动自己的 kafka 主题并将其用于测试。
当我启动这个应用程序上下文时,它正在加载并且它期望在我本地的 kafka 集群。我需要停止加载应用程序上下文。我从https://github.com/code-not-found/spring-kafka/blob/master/spring-kafka-unit-test-classrule/src/test/java/com/codenotfound/kafka/producer复制了代码/SpringKafkaSenderTest.java工作得很好。当我在我的项目中遵循相同的风格时,我可以看到实际的应用程序开始了。
SpringKafkaSenderTest .java
spring - 嵌入式 Kafka 代理 IP 未在属性文件中解析
我面临一个问题,即我的 KafkaProducerConfig
获得无效bootstrap.servers
值,因为我的单元测试@PropertySource
没有解析该spring.embedded.kafka.brokers
属性。当我将生产者配置转储到日志时,我得到以下信息:
显然,该属性没有得到解决。假设我有以下嵌入式 Kafka 测试。
我的kafkaTestProperties.properties
文件如下:
embedded-kafka-brokers
最终将ProducerConfig
通过一些底层自动配置提供给 Kafka。
有趣的是,embeddedKafkaBrokers
测试类中的实例字段确实包含嵌入式 kafka 设置的代理 IP。
我已经得出结论,属性源加载顺序存在问题,@EmbeddedKafka
没有及时设置代理 IP 系统属性kafkaTestProperties.properties
以解决它。这个问题是在将我们的代码库移植到 Spring Boot 2 和 Spring Cloud Finchley SR2 后出现的,其中 Spring Kafka API 已经升级。
我试过删除@SpringBootTest
和包装test()
代码SpringApplicationBuilder
无济于事。
关于如何解决这个问题的任何建议?也许我可以利用@AutoConfigurationAfter
或@Order
?有没有办法命令加载属性源?
unit-testing - 如何在 spring-boot 应用程序中使用 @KafkaListener 注释测试方法?
我有一个spring
带有@KafkaListener
方法的组件:
现在,我想测试这个方法。我想确保此方法正确接收消息。我尝试创建a Unit test
:
但我不明白接下来会发生什么。我该如何测试这种方法?
spring-boot - ZooKeeper 会话在测试中过期
我正在使用 EmbeddedKafka 使用以下注释配置来测试我的模块:
它在大多数情况下都有效。
但有时,由于 zookeeper 会话超时,它在创建 spring 上下文时失败:
java.lang.IllegalStateException:无法加载 ApplicationContext 在
...
在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 在 org.gradle.internal .concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55) at java.base/java.lang.Thread.run(Thread.java:834) 原因:org.springframework.beans.factory.BeanCreationException: 创建 bean 时出错名称为“embeddedKafka”:调用 init 方法失败;嵌套异常是 org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1745) at org.springframework.beans.factory.support。 org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = 会话在 org.apache.zookeeper.KeeperException.create(KeeperException.java:130) 在 kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1631) 在 kafka.zk 过期.KafkaZkClient.registerBroker(KafkaZkClient.scala:87) at kafka.server.KafkaServer.startup(KafkaServer.scala:257) at kafka.utils.TestUtils$.createServer(TestUtils.scala:132) at kafka.utils.TestUtils.createServer (TestUtils.scala) 在 org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:215) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1804) 在 org.springframework。 beans.factory.support.AbstractAutowireCapableBeanFactory。initializeBean(AbstractAutowireCapableBeanFactory.java:1741) ... 82 更多
有人知道为什么会发生吗?为什么在使用 EmbeddedKafka 时甚至需要 ZooKeeper?
任何帮助,将不胜感激。
java - 嵌入式 Kafka Appender 不适用于 Log4j2 xml
我想使用 log4j2 xml 为 Kafka Appender 编写单元测试。我正在使用嵌入式 kafka,但它无法初始化 ${spring.embedded.kafka.brokers} 这个属性。
我收到这个错误
Log4j2.xml
应用程序属性
在上面我试图从 application.properties 获取 bootstrapServers 但它无法初始化 ${spring.embedded.kafka.brokers} 这个属性。另一方面,我得到 securityProtocol 的值,因为它是字符串。先感谢您。
apache-kafka - 在 Spring Cloud Stream 中使用嵌入式 Kafka 进行集成测试时,如何立即验证消息是否已得到确认?
我们使用 Spring Cloud Stream Kafka Binder(与 Project Reactor 集成,即
Flux
流)和手动偏移提交
(即autoCommitOffset = false
)。
我们正在尝试使用来自 spring-kafka-test 的 Embedded Kafka编写一个集成测试,该测试应该通过在测试向我们的主题发送消息之前和之后使用管理客户端 手动读取消费者组偏移量来断言这一切正常。
测试间歇性地失败。使用等待性,我们现在最多等待 10 秒来轮询偏移量,这似乎解决了我们的大部分问题,因为偏移量将在大约 7 秒后改变——但这对于测试来说并不令人满意。
一旦我们通过调用手动确认消息接收,有没有办法确保 Spring Cloud Stream Kafka Binder 立即写入偏移更改Acknowledgement.acknowledge()
?
换句话说:我们如何验证acknowledge
在我们的测试中被调用而无需等待?
我们使用 Kotlin、Mockito 和Mockito-kotlin,因此不能使用 PowerMockito。