2

假设我们有:

  • 一个 URL 列表,它是我们 Multi 的来源
  • 作为第一步,我们使用 HTTP 客户端调用获取此页面的 HTML
  • 然后我们尝试找到一些特定的标签并获取它的内容
  • 然后我们将找到的东西存储到数据库中

现在我们在这里有 3 个步骤。有没有办法可以并行运行这些步骤?我的意思是一段时间后它应该:抓取 HTML 并同时处理 html + 获取标签内容,同时将数据从已处理的项目中保存到数据库中。(希望我的意思很明显)这样我们就可以进行并行处理。默认情况下,我可以看到,mutiny 以串行方式进行。

这是一个例子:

  @Test
  public void test3() {
    Multi<String> source = Multi.createFrom().items("a", "b", "c");
    source
            .onItem().transform(i -> trans(i, "-step1"))
            .onItem().transform(i -> trans(i, "-step2"))
            .onItem().transform(i -> trans(i, "-step3"))
            .subscribe().with(item -> System.out.println("Subscriber received " + item));
  }


  private String trans(String s, String add) {
    int t = new Random().nextInt(4) * 1000;
    try {
      print("Sleeping for '" + s + "' miliseconds: " + t);
      Thread.sleep(t);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    return s + add;
  }

现在这会报告以下控制台输出:

Sleeping for 'a' miliseconds: 2000
Sleeping for 'a-step1' miliseconds: 3000
Sleeping for 'a-step1-step2' miliseconds: 3000
Subscriber received a-step1-step2-step3
Sleeping for 'b' miliseconds: 0
Sleeping for 'b-step1' miliseconds: 0
Sleeping for 'b-step1-step2' miliseconds: 0
Subscriber received b-step1-step2-step3
Sleeping for 'c' miliseconds: 1000
Sleeping for 'c-step1' miliseconds: 3000
Sleeping for 'c-step1-step2' miliseconds: 3000
Subscriber received c-step1-step2-step3

可以看到它没有并行运行。我在这里错过了什么?

4

2 回答 2

2

这是预期的,Multi将项目作为流处理。

如果您想进行并行操作(例如,启动 10 个 HTTP 请求),您应该组合Uni,请参阅https://smallrye.io/smallrye-mutiny/guides/combining-items

于 2021-01-23T10:52:39.430 回答
2

正如@jponge 提到的,您可以收集一些物品List<Uni<String>> ,然后致电

Uni.combine().all().unis(listOfUnis).onitem().subscribe().with()

List<Uni<String>> listOfUnis = new ArrayList<>();
    Multi<String> source = Multi.createFrom().items("a", "b", "c");
    source
            .onItem().invoke(i -> listOfUnis.add(trans(i, "-step1")))
            .onItem().invoke(i -> listOfUnis.add(trans(i, "-step2")))
            .onItem().invoke(i -> listOfUnis.add(trans(i, "-step3")))
// do not subscribe on Multis here

这里还有一个注意事项 - 如果您要进行 HTTP 调用,最好添加

.emitOn(someBlockingPoolExecutor)

因为你不想阻塞等待 http 调用完成的 Netty 线程

于 2021-02-03T14:10:47.567 回答