是否有任何示例项目展示了如何将 Kafka 与 Micronaut 一起使用?我在让它工作时遇到问题。
我有以下制作人:
@KafkaClient
interface AppClient {
@Topic("topic-name")
void sendMessage(@KafkaKey String id, Event event)
}
和听众:
@KafkaListener(
groupId="group-id",
offsetReset = OffsetReset.EARLIEST
)
class AppListener {
@Topic("topic-name")
void onMessage(Event event) {
// do stuff
}
}
我的 application.yml 包含:
kafka:
bootstrap:
servers: localhost:2181
和 application-test.yml (这是正确的,它应该与 application.yml 位于同一目录中吗?也不确定应该如何使用嵌入式服务器):
kafka:
# embedded:
# enabled: true
# topics: promo-api-promotions
bootstrap:
servers: localhost:9092
我的测试看起来像:
@MicronautTest
class AppSpec extends Specification {
@Shared
@AutoCleanup
EmbeddedServer server = ApplicationContext.run(EmbeddedServer)
@Shared
private AppClient appClient =
server.applicationContext.getBean(AppClient)
def 'The upload endpoint is called'() {
// test here
appClient.sendMessage(id, event)
// other test stuff
}
我遇到的主要问题是:
我的消费者没有从我的主题中消费。我可以看到生产者在 Kafka 中创建了主题并创建了客户端组,但偏移量保持在 0。
我在启动测试时遇到问题,看起来好像创建了两个客户端实例,因此 MBean 注册失败(另外,如果我尝试使用嵌入式 Kafka,我会收到关于端口 9092 的不同消息正在使用中,因为它尝试启动服务器两次):
javax.management.InstanceAlreadyExistsException: com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) 上的 kafka.consumer:type=app-info,id=app-kafka-client-app-listener .jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)