我使用 Spring 云函数方法传输 Flux 创建了 3 个简单的 Spring 云流应用程序(源/处理器/接收器)。
源应用程序:
@SpringBootApplication
public class SourceApplication {
public static void main(String[] args) {
SpringApplication.run(SourceApplication.class, args);
}
@PollableBean
public Supplier<Flux<String>> stringSupplier() {
return () -> {
String v1 = String.valueOf("abc");
String v2 = String.valueOf("pqr");
String v3 = String.valueOf("xyz");
return Flux.just(v1, v2, v3);
};
}
}
处理器应用程序:
@SpringBootApplication
public class ProcessorApplication {
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux.map(value -> value.toUpperCase()).log();
}
public static void main(String[] args) {
SpringApplication.run(ProcessorApplication.class, args);
}
}
接收器应用程序:
@SpringBootApplication
public class SinkApplication {
public static void main(String[] args) {
SpringApplication.run(SinkApplication.class, args);
}
@Bean
public Consumer<Flux<String>> log() {
return flux -> {
flux.subscribe(f -> System.out.println("Received data: " + f));
};
}
}
我添加的依赖项是:
SpringBoot version = 2.2.6.RELEASE
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.cloud:spring-cloud-starter-stream-rabbit")
implementation("org.springframework.cloud:spring-cloud-starter-function-webflux:3.0.7.RELEASE")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-webflux")
我已经在 Spring Cloud Data Flow 中注册了这些应用程序并部署在流中。
我能够将数据传输到这些应用程序,并分别通过 HTTP 和 RabbitMQ 接收输出。但是,消息不会在应用程序之间进行通信(源->处理器->接收器)。我是否缺少任何依赖项、注释或应用程序属性。
目前我的应用程序属性文件是完全空的。