我正在使用 spring cloud 功能来处理来自带有 Flux 的 kafka 的数据。默认情况下,它在消费者线程(消费消息的地方)中处理数据。我将为并行数据处理和节流实现线程池,并且在 Spring Cloud Integration 中有一个很棒的实现,称为 executorChannel ( https://docs.spring.io/spring-integration/api/org/springframework/integration/channel/ ExecutorChannel.html )
功能实现示例:
public static class FN1 implements Function<Flux<String>, Flux<String>> {
public Flux<String> apply(Flux<String> data) {
return data
.map(f -> doSomething() )
}
}
所以我发现没有简单的方法来连接通过 executorChannel 实现的功能。
Mb 有没有办法定义 inputChannel 类型?
UPD:阅读奥列格回答下的评论。它们非常有用。