0

我尝试运行 Kafka 拓扑测试环境。

生产代码真的很小

@SpringBootApplication
public class ProcessApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProcessApplication.class, args);
    }

    @Bean
    public Function<Input, Output> process() {
        return EventTransformer::transform;
    }
}

现在我想在集成测试中测试代码。如果我理解这个概念是正确的,我必须将 new ProcessApplication().process() 包含到 StreamsBuilder 中,但我不知道如何将 java.util.Function 添加到其中。

private Topology createTopology(SpecificAvroSerde specificAvroSerde) { 
    final Serde<String> stringSerde = new Serdes.StringSerde();
    final StreamsBuilder builder = new StreamsBuilder();
    KStream<String, Input> input = builder.stream(INPUT_TOPIC,
            Consumed.with(stringSerde, specificAvroSerde));

    new ProcessApplication().process();

    // Compiles and runs but data are missing, assume to add new ProcessApplication().process() somehow
    KStream<String, Output> output =
            input.mapValues(value -> new Output());
    output.to(OUTPUT_TOPIC, Produced.with(stringSerde, specificAvroSerde));
    
    return builder.build();
}

此外,我找到了另一种解决方案,如何一起构建拓扑。

private Topology createTopology()
    topology.addSource("mySource", INPUT_TOPIC)
    new ProcessApplication().process()
    topology.addProcessor("myProcessor", new ProcessorSupplier, "mySource")
    topology.addSink(
            "mySink",
            OUTPUT_TOPIC,
            Serdes.String().serializer(),
            new SpecificAvroSerde().serializer(),
            "myProcessor"
    );
    
    return topology
}

即使在这里,我也不知道我应该做什么。更糟糕的是,我有些怀疑自己写的处理器真的是进程还是必须使用ProcessorSupplier。

在 Docker 环境中,生产代码按预期工作。

谢谢,马库斯

4

0 回答 0