0

我正在使用 spring cloud 功能来处理来自带有 Flux 的 kafka 的数据。默认情况下,它在消费者线程(消费消息的地方)中处理数据。我将为并行数据处理和节流实现线程池,并且在 Spring Cloud Integration 中有一个很棒的实现,称为 executorChannel ( https://docs.spring.io/spring-integration/api/org/springframework/integration/channel/ ExecutorChannel.html )

功能实现示例:

public static class FN1 implements Function<Flux<String>, Flux<String>> {
  public Flux<String> apply(Flux<String> data) {
    return data
      .map(f ->  doSomething() )      
  }
}

所以我发现没有简单的方法来连接通过 executorChannel 实现的功能。

Mb 有没有办法定义 inputChannel 类型?

UPD:阅读奥列格回答下的评论。它们非常有用。

4

1 回答 1

1

你的意思是这样的?

@SpringBootApplication
public class SampleFunctoinAppApplication  {

    public static void main(String[] args) throws Exception {

        ApplicationContext context = SpringApplication.run(SampleFunctoinAppApplication.class, args);
        SubscribableChannel output = context.getBean("output", SubscribableChannel.class);
        output.subscribe(System.out::println);

        MessageChannel channel = context.getBean("executorChannel", MessageChannel.class);
        channel.send(new GenericMessage<String>("hello"));
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows
                .from("executorChannel")
                .transform(echo())
                .channel("output")
                .get();
    }

    @Bean
    public ExecutorChannel executorChannel() {
        return new ExecutorChannel(Executors.newCachedThreadPool());
    }

    public Function<String, String> echo() {
        return v -> v;
    }
}

“定义 inputChannel 类型”是什么意思?

于 2020-04-13T13:34:52.617 回答