1

如何处理 Vertx 中的延迟作业列表(实际上是数百个 HTTP GET 请求,以及禁止快速请求主机的有限 API)?现在,我正在使用这段代码,但它被阻止了,因为 Vertx 会立即启动所有请求。最好在每个请求之间有 5 秒的延迟来处理每个请求。

public void getInstrumnetDailyInfo(Instrument instrument,
                                   Handler<AsyncResult<OptionInstrument>> handler) {
  webClient
    .get("/Loader")
    .addQueryParam("i", instrument.getId())
    .timeout(30000)
    .send(
      ar -> {
        if (ar.succeeded()) {
          String html = ar.result().bodyAsString();
          Integer thatData = processHTML(html);
            instrument.setThatData(thatData);
            handler.handle(Future.succeededFuture(instrument));
        } else {
          // error
          handler.handle(Future.failedFuture("error " +ar.cause()));
        }
      });
}

public void start(){
  List<Instrument> instruments = loadInstrumentsList();
  instruments.forEach(
    instrument -> {
      webClient.getInstrumnetDailyInfo(instrument,
            async -> {
              if(async.succeeded()){
                instrumentMap.put(instrument.getId(), instrument);
              }else {
                log.warn("getInstrumnetDailyInfo: ", async.cause());
              }
            });
        });
}
4

2 回答 2

3

您可以考虑使用计时器来触发事件(而不是在启动时全部触发)。

Vertx 有两种变体,

  1. .setTimer()延迟后触发特定事件

    vertx.setTimer(interval, new Handler<T>() {});


2. .setPeriodic()每次经过指定的时间段时触发。

vertx.setPeriodic(interval, new Handler<Long>() {});

setPeriodic似乎是您正在寻找的东西。

您可以从文档中获取更多信息

对于更复杂的 Vertx 调度用例,您可以查看Chime 或其他调度程序此模块

于 2021-10-16T14:17:37.370 回答
1

您可以使用任何开箱即用的速率限制器功能并将其调整为异步使用。

RateLimiter来自 Guava的示例:

    // Make permits available at a rate of one every 5 seconds
    private RateLimiter limiter = RateLimiter.create(1 / 5.0);

    // A vert.x future that completes when it obtains a throttle permit
    public Future<Double> throttle() {
        return vertx.executeBlocking(p -> p.complete(limiter.acquire()), true);
    }

然后...

   throttle()
       .compose(d -> {
           System.out.printf("Waited %.2f before running job\n", d);
           return runJob(); // runJob returns a Future result
       });
于 2021-10-30T19:50:29.427 回答