我有一个反应流,它获取一些数据,遍历数据,处理数据,最后将数据写入 Kafka
public Flux<M> sendData(){
Flux.fromIterable(o.getC()).publishOn(Schedulers.boundedElastic())
.flatMap(id->
Flux.fromIterable(getM(id)).publishOn(Schedulers.boundedElastic())
.flatMap( n -> {
return Flux.fromIterable(o.getD()).publishOn(Schedulers.boundedElastic())
.flatMap(d -> return Flux.just(sendToKafka));
})
)
.doOnError(throwable -> {
log.debug("Error while reading data : {} ", throwable.getMessage());
return;
})
.subscribe();
}
public void run(String... args){
sendData();
}
我希望这个工作流程每分钟运行一次。有人可以帮助我了解如何在流中安排这个吗?