用例
HazelcastJet 版本 0.6.1 Hazelcast 版本 3.10.2
鉴于 DAG 的这个(简化版)
顶点
S1 发出 5 个类型 A 项目的源(从带分区的数据库中读取)本地并行度 = 1
发出 150K 类型 B 项目的S2源(从 DB 中批量读取 100 个分区的迭代器) 本地并行度 = 1
AD 处理器,适配 A->A1 和 B->B1 类型并一一发出
FA Processors.filterP 仅接受 A1 类型的项目并一一发出
FB Processors.filterP 仅接受 B1 类型的项目并一一发出
CL 处理器首先累积所有 A1 类型的项目,然后当它收到 B1 类型的项目时,用从适当的 A1 获得的一些人员来丰富它,然后一个接一个地发出。
写入 B1 的WR Sink 本地并行度 = 1
注意: 只是为了给过滤器处理器赋予意义:在 DAG 中有其他源流入同一个适配器 AD,然后使用过滤器处理器进入其他路径。
边缘
S1 --> 广告
S2 --> 广告
AD --> FA (从序数 0)
AD --> FB (从序数 1)
FA --> CL(到序号 0,优先级 0 分发和广播)
FB --> CL(到序号 1,优先级为 1)
CL --> 写
问题
如果源 S2 有“少量”要加载的项目(即 15K),则 emitFromTraverser 永远不会返回 false。
如果源 S2 有“许多”项要加载(即 150K),则 emitFromTraverser 在以下情况下返回 false:
- 所有 A1 项目已由 CL 处理
- 大约 30% 的 B1 项目已经传输到 CL,但没有一个被 CL 处理(DiagnosticProcessor 记录该元素已发送到 CL 但未处理)
S2代码供参考:
protected void init(Context context) throws Exception {
super.init(context);
this.iterator = new BQueryIterator(querySupplier, batchSize);
this.traverser = Traversers.traverseIterator(this.iterator);
}
public boolean complete() {
boolean result = emitFromTraverser(this.traverser);
return result;
}
问题
- CL 在源代码结束之前不处理项目是否正确?
- 在 CL Vertex 上使用优先级 + 分布式 + 广播是否正确?
更新
似乎从未调用过 CL 边缘 1 上的 completeEdge。有人可以告诉我为什么吗?
谢谢!