-1

我有一个反应流,它获取一些数据,遍历数据,处理数据,最后将数据写入 Kafka

public Flux<M> sendData(){

     Flux.fromIterable(o.getC()).publishOn(Schedulers.boundedElastic())
                .flatMap(id->
                        Flux.fromIterable(getM(id)).publishOn(Schedulers.boundedElastic())
                                .flatMap( n -> {
                                    return Flux.fromIterable(o.getD()).publishOn(Schedulers.boundedElastic())
                                            .flatMap(d -> return Flux.just(sendToKafka));
                                })
                )
                .doOnError(throwable -> {
                    log.debug("Error while reading data : {} ", throwable.getMessage());
                    return;
                })
                .subscribe();
}


public void run(String... args){
        sendData();
    }

我希望这个工作流程每分钟运行一次。有人可以帮助我了解如何在流中安排这个吗?

4

1 回答 1

1

如果你想每分钟运行一些东西,你可以做这样的事情。

Flux.interval(Duration.ofMinutes(1))
    .onBackpressureDrop()
    .flatMap(n -> sendData())
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe()
于 2021-11-15T18:39:53.453 回答