0

我正在使用 Mutiny 扩展(用于 Quarkus),但我不知道如何解决这个问题。

我想以异步方式发送许多请求,所以我读过 Mutiny 扩展。但是服务器关闭了连接,因为它收到了数千个。

所以我需要:

  • 分块发送请求
  • 发送所有请求后,执行操作。

我一直在使用 Uni 对象来组合所有响应,如下所示:

Uni<Map<Integer, String>> uniAll = Uni.combine()
   .all()
   .unis(list)
   .combinedWith(...);

接着:

uniAll.subscribe()
      .with(...);

此代码并行发送所有请求,以便服务器关闭连接。

我正在使用一组 Multi 对象,但我不知道如何使用它(在 Mutiny 文档中我找不到任何示例)。

这就是我现在正在做的方式:

          //Launch 1000 request
          for (int i=0;i<1000;i++) {
            multi = client.getAbs("https://api.*********.io/jokes/random")
                      .as(BodyCodec.jsonObject())
                      .send()
                      .onItem().transformToMulti(
                              array -> Multi.createFrom()
                                       .item(array.body().getString("value")))
                                       .group()
                                       .intoLists()
                                       .of(100)
                                       .subscribe()
                                       .with(a->{
                                         System.out.println("Value: "+a);
                                        });
          }

我认为订阅不会执行,直到有“100”组项目,但我想这不是方式,因为它不起作用。

有谁知道如何以 100 个块启动数千个异步请求?

提前致谢。

2012 年 4 月 19 日更新

我试过这种方法:

    List<Uni<String>> listOfUnis = new ArrayList<>();
    for (int i=0;i<1000;i++) {
        listOfUnis.add(client
                       .getAbs("https://api.*******.io/jokes/random")
                       .as(BodyCodec.jsonObject())
                       .send()
                       .onItem()
                       .transform(item -> item
                                        .body()
                                        .getString("value")));
      }


    Multi<Uni<String>> multiFormUnis = Multi.createFrom()
                                            .iterable(listOfUnis);

    List<String> listOfResponses = new ArrayList<>();

    List<String> listOfValues = multiFormUnis.group()
                 .intoLists()
                 .of(100)
                 .onItem()
                 .transformToMultiAndConcatenate(listOfOneHundred -> 
                 {
                     System.out.println("Size: "+listOfOneHundred.size());
                     
                     for (int index=0;index<listOfOneHundred.size();index++) {
                         listOfResponses.add(listOfOneHundred.get(index)
                                                             .await()
                                                             .indefinitely());                          
                     }                       
                
                return Multi.createFrom()
                            .iterable(listOfResponses);
                 })
                 .collectItems()
                 .asList()
                 .await()
                 .indefinitely();

    for (String value : listOfValues) {
        System.out.println(value);
    }

当我把这条线:

 listOfResponses.add(listOfOneHundred.get(index)
                                     .await()
                                     .indefinitely()); 

响应一个接一个地打印,当前 100 组项目结束时,它会打印下一组。问题?有顺序请求,需要很多时间

我想我已经接近解决方案了,但我需要知道,如何仅以 100 组为一组发送并行请求,因为如果我输入:

subscribe().with()

所有请求都是并行发送的(而不是 100 组)

4

2 回答 2

1

我认为你创造了很多错误,使用它会更容易:

Multi<String> multiOfJokes = Multi.createFrom().emitter(multiEmitter -> {
        for (int i=0;i<1000;i++) {
            multiEmitter.emit(i);
        }
        multiEmitter.complete();
    }).onItem().transformToUniAndMerge(index -> {
        return Uni.createFrom().item("String" + index);
    })

使用这种方法,它应该使调用并行。现在是如何将其列入列表的问题。

分组工作正常,我使用以下代码运行它:

Random random = new Random();
    Multi<Integer> multiOfInteger = Multi.createFrom().emitter(multiEmitter -> {
        for (Integer i=0;i<1000;i++) {
            multiEmitter.emit(i);
        }
        multiEmitter.complete();
    });
    Multi<String> multiOfJokes = multiOfInteger.onItem().transformToUniAndMerge(index -> {
        if (index % 10 == 0 ) {
            Duration delay = Duration.ofMillis(random.nextInt(100) + 1);
            return Uni.createFrom().item("String " + index  + " delayed").onItem()
                    .delayIt().by(delay);
        }
        return Uni.createFrom().item("String" + index);
    }).onCompletion().invoke(() -> System.out.println("Completed"));
    Multi<List<String>> multiListJokes = multiOfJokes
            .group().intoLists().of(100)
            .onCompletion().invoke(() -> System.out.println("Completed"))
            .onItem().invoke(strings -> System.out.println(strings));
    multiListJokes.collect().asList().await().indefinitely();

您将获得您的字符串列表。

我不知道,您打算如何将列表发送到后端。但是您可以使用以下方法之一:

  • 调用(异步执行)
  • 编写自己的订阅者(实现订阅者)方法很简单。根据您的批量请求的需要。

我希望你以后能更好地理解它。

PS:链接到我学到所有这些的指南: https ://smallrye.io/smallrye-mutiny/guides

于 2021-05-06T20:58:21.713 回答
0

因此,简而言之,您希望批量并行调用服务器,而不是一次性使用所有内容。

这对你有用吗?它使用merge. 在我的示例中,它的并行度为 2。

Multi.createFrom().range(1, 10)
            .onItem()
            .transformToUni(integer -> {
                return <<my long operation Uni>>
            })
            .merge(2) //this is the concurrency 
            .collect()
            .asList();

我不确定merge今年晚些时候是否添加了,但这似乎可以满足您的需求。在我的示例中,“生成 Uni 的长操作”实际上是对 Microprofile Rest Client 的调用,该客户端生成 Uni,并返回一个字符串。之后,merge您可以放置​​另一个onItem来执行响应(在Multi之后是普通的merge),而不是将所有内容收集为列表。

于 2021-09-20T09:50:14.160 回答