0

考虑以下场景:

我们想要获取一个大型分布式对象集合,并且对于集合中的每个对象,我们想要启动另一个计算,该计算使用当前对象和另一个大型分布式集合来计算转换当前对象的结果。

例如

集合A:1、2、3、4、5、6、7、8……

集合 B:1,2,3,4,5,6,7,8……

对于 A 中的每个值,我们迭代 B 中的所有值,将每个值乘以 2 并对这些值求和,我们将 A 中的每个值映射到该总和乘以当前 A 值。

以下是我的尝试,当使用以下内容时会导致死锁:

c2.newJob(p2).join()

使用以下内容时不会出现死锁:

c2.newJob(p2)

,但是我们希望 p2 完成以确保我们得到正确的总和。

对于这个特定的用例,这似乎是一种使用 Jet 的非惯用方式,但是我想使用这种模式来解决其他问题,因此非常感谢您的帮助。

JetInstance jet =  Jet.newJetInstance();
JetInstance c1 =  Jet.newJetClient();

Pipeline p1 = Pipeline.create();

List<Integer> aIn =  jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);

p1.drawFrom(Sources.list("a-in"))
        .map(e -> {
          Pipeline p2 = Pipeline.create();
          JetInstance c2 =  Jet.newJetClient();

          List<Integer> bIn = c2.getList("b-in");
          bIn.add(1);
          bIn.add(2);
          bIn.add(3);

          p2.drawFrom(Sources.list("b-in"))
                  .map(i->((Integer)i)*2)
                  .drainTo(Sinks.list("b-out"));

          List<Integer> bOut = c2.getList("b-out");

          // I would have thought it should just wait for the computation to complete,
          // instead the join here causes jet to block itself,
          c2.newJob(p2).join();

          int sum = 0;
          for (Integer i : bOut){
            sum+=i;
          }

          return ((Integer)e)*sum;
        }).drainTo(Sinks.list("a-out"));
c1.newJob(p1).join();
4

2 回答 2

2

您的代码中有多个问题:

  1. map功能不应阻塞。在即将发布的版本中,我们将添加mapUsingContextAsync您可以将客户端连接用作上下文、提交作业并返回的位置job.getFuture()

  2. 这些map操作将并行运行。您需要确保他们不共享临时列表。在您的示例中,所有子作业都使用b-out并且它们会覆盖彼此的数据。

  3. 死锁的原因是:join()inmap()阻塞了协作工作线程,正在等待子作业完成,但由于协作工作线程阻塞,子作业无法完成。

此外,Jet 并未针对非常小的批量作业进行优化,但我猜您的实际作业更大。部署这项工作有相当多的开销;如果作业本身只运行几毫秒,则开销很大。在这种特定情况下,您最好只使用list.stream().map(i->i*2).sum()而不是子作业。

JetInstance jet = Jet.newJetInstance();
JetInstance c1 = Jet.newJetClient();

Pipeline p1 = Pipeline.create();

List<Integer> aIn = jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);

List<Integer> bIn = jet.getList("b-in");
bIn.add(1);
bIn.add(2);
bIn.add(3);

p1.drawFrom(Sources.list("a-in"))
  .mapUsingContextAsync(
          ContextFactory
                  .withCreateFn(inst -> tuple2(inst, inst.<UUID, Long>getMap("tmpResults")))
                  // mark as non-cooperative, job submission does some blocking
                  .toNonCooperative()
                  .withLocalSharing()
                  .withMaxPendingCallsPerProcessor(2)
                  .withDestroyFn(ctx -> ctx.f1().destroy()),
          (ctx, item) -> {
              Pipeline p2 = Pipeline.create();
              JetInstance instance = ctx.f0();
              UUID key = UUID.randomUUID();
              IMapJet<UUID, Long> tmpResultsMap = ctx.f1();

              p2.drawFrom(Sources.list("b-in"))
                .map(i -> ((Integer) i) * 2L)
                .aggregate(summingLong(Long::longValue))
                .map(sum -> entry(key, sum))
                .drainTo(Sinks.map(tmpResultsMap));

              return instance.newJob(p2).getFuture()
                             .thenApply(r -> entry(item, tmpResultsMap.remove(key)));
          })
  .drainTo(Sinks.list("a-out"));

c1.newJob(p1).join();
jet.getList("a-out").forEach(System.out::println);

这将打印以下输出:

1=12
2=12
3=12

上面的代码在当前快照中工作,应该在几周后到期的 Jet 3.0 中工作。

于 2019-03-14T08:07:24.650 回答
1

@newlogic,试试这个方法:

  1. 创建一个读取b-in和写入b-out地图的作业,而不是列表。您可以使用已知键或仅使用时间戳等作为键并在该表上定义 TTL 以删除旧结果。
  2. 在表上创建一个侦听b-out器(本地侦听器,因此仅通知持有更新密钥的节点)以侦听 entryAdded/Updated 事件,这取决于您在第一步中选择的内容并从该侦听器方法提交一个新作业进行处理a-in

这样,您无需等待,一旦第一个作业完成,它会自动触发第二个作业。

于 2019-03-13T21:20:43.887 回答