3

我正在使用 Kafka Streams、Spring-Kafka 和 Spring Boot 编写流应用程序。我找不到任何信息如何在使用 Spring-Kafka 时正确测试 Kafka Streams DSL 完成的流处理。文档提到 EmbeddedKafkaBroker,但似乎没有关于如何处理例如状态存储的测试的信息。

只是为了提供一些我想测试的简单示例。我注册了以下 bean(其中 Item 是 avro 生成的):


    @Bean
    public KTable<String, Long> itemTotalKTable(StreamsBuilder streamsBuilder) {
        return streamsBuilder
                .stream(ITEM_TOPIC,
                        Consumed.with(Serdes.String(), itemAvroSerde))
                .mapValues((id, item) -> item.getNumber())
                .groupByKey()
                .aggregate(
                        () -> 0L,
                        (id, number, agg) -> agg + number,
                        Materialized.with(Serdes.String(), Serdes.Long()));
    }

测试所有项目编号是否汇总的正确方法是什么?

4

2 回答 2

2

Spring Kafka for Kafka Streams 支持不会带来任何额外的 API,尤其是在流构建及其处理方面。

我们最近为自己打开了一个很好的kafka-streams-test-utils库,可以在没有任何 Kafka 代理启动(甚至嵌入)的情况下用于单元测试。

在我们的几个测试中,我们有这样的东西:

    KStream<String, String> stream = builder.stream(INPUT);
    stream
            .transform(() -> enricher)
            .to(OUTPUT);

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
    TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config);

    ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(),
            new StringSerializer());
    driver.pipeInput(recordFactory.create(INPUT, "key", "value"));
    ProducerRecord<byte[], byte[]> result = driver.readOutput(OUTPUT);
    assertThat(result.headers().lastHeader("foo")).isNotNull();

我相信应该有一些 APITopologyTestDriver来处理提到的状态存储。

于 2019-09-03T15:44:29.850 回答
0

也许您可以创建一个将您KTable作为参数并调用.toStream().to(topicname,Produced.with(keyserde, valueserde))它的方法,然后您可以执行以下操作:

MyTopologyBuilder builder = new MyTopologyBuilder();

testDriver = new TopologyTestDriver(builder.build(), config);

ConsumerRecord<byte[], byte[]> input = createStepRecord(key, record);

testDriver.pipeInput(input);

ProducerRecord<String, String> out testDriver.readOutput(topic, new StringDeserializer(), ew AvroDeserializer<>(MyClass.class);

assertThat(out.key(), is(key));
assertEquals(myPredefinedValue, out.value());
assertEquals(5, out.value().getMyList().size());

这应该可行,但我猜可能有更优雅的方式。

于 2019-09-03T16:16:39.467 回答