3

我要求确认我对 tryProcess() 逻辑的假设。

详细说明返回值(真/假)如何影响处理器上的 DAG 工作流,该处理器具有 2 个未指定优先级的传入边。

我的假设是,如果处理器的两条边都有传入项目,并且一个 tryProcess() 返回 false,则另一边将被处理(如果该边上有更多传入项目可用)。根据哪个边缘停止处理和哪个接受它们来交替传入项目。

问题

有时会发生一个处理器实例阻塞在总是返回 false 的 tryProcess(#0) 上(因为我们希望处理来自其他边缘的新项目)。tryProcess(#0) 被重复调用,而 tryProcess(#1) 从未被调用。我确信无论是#0 还是#1 边缘都不会在处理器上调用completeEdge(),所以我希望从边缘#1 有更多的项目要处理。这通常发生在多次运行同一个 Job 之后。

为了更好地解释这个问题,这是我的用例:

用例

我的数据模型由以下对象组成

  • A:由“ida”属性标识的对象
  • B:由“idb”属性标识的对象。它使用“ida”值引用 A
  • AB:耦合B对象及其引用的A对象的对象

我需要将 B 对象与正确引用的 A 对象匹配并发出其中的几个。

我有一个具有此设置的 DAG:

顶点

  • SA:“A”类型的源项目(localParallelism(1),发出按“ida”属性排序的 A 对象)
  • SB:类型为“B”的源项目(localParallelism(1),发出按引用的“ida”属性排序的 B 个对象)
  • C-AB:将 B 对象与引用的 A 对象匹配的处理器(发出 AB 对象)

连接

  • SA -> C-AB : 传入边缘#0(未指定优先级,由“ida”属性分区)
  • SB -> C-AB : 输入边#1(未指定优先级,通过引用“ida”属性进行分区)

该环境由具有 2 个节点的 hazelcast 喷气机集群组成。

逻辑

C-AB 处理器获取第一个“A”对象(从边缘#0)并保持它直到与该“A”对象相关的所有“B”对象都被处理。如果它接收到另一个“A”对象,它会在 tryProcess(#0) 中返回 false。

当它接收到与当前“A”匹配的“B”对象(来自边缘#1)时,它会发出“AB”。

当处理器接收到一个带有下一个“A”对象的引用的“B”对象时,它会丢弃当前的“A”并等待下一个。

如果它在获得引用的“A”对象之前接收到“B”对象,如果收到新的“B”,则等待正确的“A”匹配 tryProcess(#1) 中返回的 false。

这应该可行,因为 SA 和 SB 发出正确排序的对象,并且边缘被正确分区以将具有相同“ida”值的对象发送到同一处理器。

4

1 回答 1

3

我的假设是,如果处理器的两条边都有传入项目,并且一个 tryProcess() 返回 false,则另一边将被处理(如果该边上有更多传入项目可用)。

这个假设是错误的。处理器的行为相当于

for (Object item : inbox) process(item);

但是通过协作多线程实现,这就是为什么这个循环必须能够“暂停”自己。我们通过让tryProcess()return实现暂停false

执行引擎总是从它停止的地方恢复处理器,并且在收件箱被清除之前不会尝试处理任何其他项目。收件箱本身包含从输入队列中取出的一批项目,而不是边缘将在整个作业期间传输的所有项目。

Jet 提供的解决边缘之间相互依赖的唯一机制是边缘优先级。如果您需要比这更细粒度的东西,您的处理器应该接受所有项目并在内部缓冲它们,直到满足您的进度条件。

于 2018-07-12T09:53:45.473 回答