考虑以下场景:
我们想要获取一个大型分布式对象集合,并且对于集合中的每个对象,我们想要启动另一个计算,该计算使用当前对象和另一个大型分布式集合来计算转换当前对象的结果。
例如
集合A:1、2、3、4、5、6、7、8……
集合 B:1,2,3,4,5,6,7,8……
对于 A 中的每个值,我们迭代 B 中的所有值,将每个值乘以 2 并对这些值求和,我们将 A 中的每个值映射到该总和乘以当前 A 值。
以下是我的尝试,当使用以下内容时会导致死锁:
c2.newJob(p2).join()
使用以下内容时不会出现死锁:
c2.newJob(p2)
,但是我们希望 p2 完成以确保我们得到正确的总和。
对于这个特定的用例,这似乎是一种使用 Jet 的非惯用方式,但是我想使用这种模式来解决其他问题,因此非常感谢您的帮助。
JetInstance jet = Jet.newJetInstance();
JetInstance c1 = Jet.newJetClient();
Pipeline p1 = Pipeline.create();
List<Integer> aIn = jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);
p1.drawFrom(Sources.list("a-in"))
.map(e -> {
Pipeline p2 = Pipeline.create();
JetInstance c2 = Jet.newJetClient();
List<Integer> bIn = c2.getList("b-in");
bIn.add(1);
bIn.add(2);
bIn.add(3);
p2.drawFrom(Sources.list("b-in"))
.map(i->((Integer)i)*2)
.drainTo(Sinks.list("b-out"));
List<Integer> bOut = c2.getList("b-out");
// I would have thought it should just wait for the computation to complete,
// instead the join here causes jet to block itself,
c2.newJob(p2).join();
int sum = 0;
for (Integer i : bOut){
sum+=i;
}
return ((Integer)e)*sum;
}).drainTo(Sinks.list("a-out"));
c1.newJob(p1).join();