我正在使用 Mutiny 扩展(用于 Quarkus),但我不知道如何解决这个问题。
我想以异步方式发送许多请求,所以我读过 Mutiny 扩展。但是服务器关闭了连接,因为它收到了数千个。
所以我需要:
- 分块发送请求
- 发送所有请求后,执行操作。
我一直在使用 Uni 对象来组合所有响应,如下所示:
Uni<Map<Integer, String>> uniAll = Uni.combine()
.all()
.unis(list)
.combinedWith(...);
接着:
uniAll.subscribe()
.with(...);
此代码并行发送所有请求,以便服务器关闭连接。
我正在使用一组 Multi 对象,但我不知道如何使用它(在 Mutiny 文档中我找不到任何示例)。
这就是我现在正在做的方式:
//Launch 1000 request
for (int i=0;i<1000;i++) {
multi = client.getAbs("https://api.*********.io/jokes/random")
.as(BodyCodec.jsonObject())
.send()
.onItem().transformToMulti(
array -> Multi.createFrom()
.item(array.body().getString("value")))
.group()
.intoLists()
.of(100)
.subscribe()
.with(a->{
System.out.println("Value: "+a);
});
}
我认为订阅不会执行,直到有“100”组项目,但我想这不是方式,因为它不起作用。
有谁知道如何以 100 个块启动数千个异步请求?
提前致谢。
2012 年 4 月 19 日更新
我试过这种方法:
List<Uni<String>> listOfUnis = new ArrayList<>();
for (int i=0;i<1000;i++) {
listOfUnis.add(client
.getAbs("https://api.*******.io/jokes/random")
.as(BodyCodec.jsonObject())
.send()
.onItem()
.transform(item -> item
.body()
.getString("value")));
}
Multi<Uni<String>> multiFormUnis = Multi.createFrom()
.iterable(listOfUnis);
List<String> listOfResponses = new ArrayList<>();
List<String> listOfValues = multiFormUnis.group()
.intoLists()
.of(100)
.onItem()
.transformToMultiAndConcatenate(listOfOneHundred ->
{
System.out.println("Size: "+listOfOneHundred.size());
for (int index=0;index<listOfOneHundred.size();index++) {
listOfResponses.add(listOfOneHundred.get(index)
.await()
.indefinitely());
}
return Multi.createFrom()
.iterable(listOfResponses);
})
.collectItems()
.asList()
.await()
.indefinitely();
for (String value : listOfValues) {
System.out.println(value);
}
当我把这条线:
listOfResponses.add(listOfOneHundred.get(index)
.await()
.indefinitely());
响应一个接一个地打印,当前 100 组项目结束时,它会打印下一组。问题?有顺序请求,需要很多时间
我想我已经接近解决方案了,但我需要知道,如何仅以 100 组为一组发送并行请求,因为如果我输入:
subscribe().with()
所有请求都是并行发送的(而不是 100 组)