我要求确认我对 tryProcess() 逻辑的假设。
详细说明返回值(真/假)如何影响处理器上的 DAG 工作流,该处理器具有 2 个未指定优先级的传入边。
我的假设是,如果处理器的两条边都有传入项目,并且一个 tryProcess() 返回 false,则另一边将被处理(如果该边上有更多传入项目可用)。根据哪个边缘停止处理和哪个接受它们来交替传入项目。
问题
有时会发生一个处理器实例阻塞在总是返回 false 的 tryProcess(#0) 上(因为我们希望处理来自其他边缘的新项目)。tryProcess(#0) 被重复调用,而 tryProcess(#1) 从未被调用。我确信无论是#0 还是#1 边缘都不会在处理器上调用completeEdge(),所以我希望从边缘#1 有更多的项目要处理。这通常发生在多次运行同一个 Job 之后。
为了更好地解释这个问题,这是我的用例:
用例
我的数据模型由以下对象组成
- A:由“ida”属性标识的对象
- B:由“idb”属性标识的对象。它使用“ida”值引用 A
- AB:耦合B对象及其引用的A对象的对象
我需要将 B 对象与正确引用的 A 对象匹配并发出其中的几个。
我有一个具有此设置的 DAG:
顶点
- SA:“A”类型的源项目(localParallelism(1),发出按“ida”属性排序的 A 对象)
- SB:类型为“B”的源项目(localParallelism(1),发出按引用的“ida”属性排序的 B 个对象)
- C-AB:将 B 对象与引用的 A 对象匹配的处理器(发出 AB 对象)
连接
- SA -> C-AB : 传入边缘#0(未指定优先级,由“ida”属性分区)
- SB -> C-AB : 输入边#1(未指定优先级,通过引用“ida”属性进行分区)
该环境由具有 2 个节点的 hazelcast 喷气机集群组成。
逻辑
C-AB 处理器获取第一个“A”对象(从边缘#0)并保持它直到与该“A”对象相关的所有“B”对象都被处理。如果它接收到另一个“A”对象,它会在 tryProcess(#0) 中返回 false。
当它接收到与当前“A”匹配的“B”对象(来自边缘#1)时,它会发出“AB”。
当处理器接收到一个带有下一个“A”对象的引用的“B”对象时,它会丢弃当前的“A”并等待下一个。
如果它在获得引用的“A”对象之前接收到“B”对象,如果收到新的“B”,则等待正确的“A”匹配 tryProcess(#1) 中返回的 false。
这应该可行,因为 SA 和 SB 发出正确排序的对象,并且边缘被正确分区以将具有相同“ida”值的对象发送到同一处理器。