0

是否有任何示例项目展示了如何将 Kafka 与 Micronaut 一起使用?我在让它工作时遇到问题。

我有以下制作人:

@KafkaClient
interface AppClient {

@Topic("topic-name")
    void sendMessage(@KafkaKey String id, Event event)
}

和听众:

@KafkaListener(
    groupId="group-id",
    offsetReset = OffsetReset.EARLIEST
)
class AppListener {

@Topic("topic-name")
    void onMessage(Event event) {
        // do stuff
    }
}

我的 application.yml 包含:

kafka:
  bootstrap:
    servers: localhost:2181

和 application-test.yml (这是正确的,它应该与 application.yml 位于同一目录中吗?也不确定应该如何使用嵌入式服务器):

kafka:
  #  embedded:
  #    enabled: true
  #    topics: promo-api-promotions
  bootstrap:
    servers: localhost:9092

我的测试看起来像:

@MicronautTest
class AppSpec extends Specification {

@Shared
@AutoCleanup
EmbeddedServer server = ApplicationContext.run(EmbeddedServer)

@Shared
private AppClient appClient =
        server.applicationContext.getBean(AppClient)

def 'The upload endpoint is called'() {
  // test here
  appClient.sendMessage(id, event)
  // other test stuff
}

我遇到的主要问题是:

  1. 我的消费者没有从我的主题中消费。我可以看到生产者在 Kafka 中创建了主题并创建了客户端组,但偏移量保持在 0。

  2. 我在启动测试时遇到问题,看起来好像创建了两个客户端实例,因此 MBean 注册失败(另外,如果我尝试使用嵌入式 Kafka,我会收到关于端口 9092 的不同消息正在使用中,因为它尝试启动服务器两次):

    javax.management.InstanceAlreadyExistsException: com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) 上的 kafka.consumer:type=app-info,id=app-kafka-client-app-listener .jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)

4

2 回答 2

0

设法解决了第二个问题 - 传递给侦听器的对象没有@JsonCreator。我通过尝试使用 Jackson 对象映射器在玩耍时从它的 JSON 构造对象来发现这一点。

如果其他人有同样的问题 - 在继续之前确保对象模型与杰克逊一起工作!

于 2018-11-02T14:31:53.863 回答
0

您应该将嵌入式配置添加kafka.embedded.enabled到带有配置的地图中,并将其传递给 ApplicationContext.run 方法。

Map<String, Object> config = Collections.
    unmodifiableMap(new HashMap<String, Object>() {
        {
            put(AbstractKafkaConfiguration.EMBEDDED, true);
            put(AbstractKafkaConfiguration.EMBEDDED_TOPICS, "test_topic");
    }
});

try (ApplicationContext ctx = ApplicationContext.run(config)) {

消费者在另一个线程中从 Kafka 消费,您必须等待一段时间,直到您的 AppListener 赶上。您可以在KafkaProducerListenerTest中看到一个简短的示例

记住 Micronaut 文档中描述的 Kafka 依赖项:嵌入 Kafka

于 2018-11-04T17:20:22.643 回答