我尝试运行 Kafka 拓扑测试环境。
生产代码真的很小
@SpringBootApplication
public class ProcessApplication {
public static void main(String[] args) {
SpringApplication.run(ProcessApplication.class, args);
}
@Bean
public Function<Input, Output> process() {
return EventTransformer::transform;
}
}
现在我想在集成测试中测试代码。如果我理解这个概念是正确的,我必须将 new ProcessApplication().process() 包含到 StreamsBuilder 中,但我不知道如何将 java.util.Function 添加到其中。
private Topology createTopology(SpecificAvroSerde specificAvroSerde) {
final Serde<String> stringSerde = new Serdes.StringSerde();
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Input> input = builder.stream(INPUT_TOPIC,
Consumed.with(stringSerde, specificAvroSerde));
new ProcessApplication().process();
// Compiles and runs but data are missing, assume to add new ProcessApplication().process() somehow
KStream<String, Output> output =
input.mapValues(value -> new Output());
output.to(OUTPUT_TOPIC, Produced.with(stringSerde, specificAvroSerde));
return builder.build();
}
此外,我找到了另一种解决方案,如何一起构建拓扑。
private Topology createTopology()
topology.addSource("mySource", INPUT_TOPIC)
new ProcessApplication().process()
topology.addProcessor("myProcessor", new ProcessorSupplier, "mySource")
topology.addSink(
"mySink",
OUTPUT_TOPIC,
Serdes.String().serializer(),
new SpecificAvroSerde().serializer(),
"myProcessor"
);
return topology
}
即使在这里,我也不知道我应该做什么。更糟糕的是,我有些怀疑自己写的处理器真的是进程还是必须使用ProcessorSupplier。
在 Docker 环境中,生产代码按预期工作。
谢谢,马库斯