0

用例

HazelcastJet 版本 0.6.1 Hazelcast 版本 3.10.2

鉴于 DAG 的这个(简化版)

顶点

S1 发出 5 个类型 A 项目的源(从带分区的数据库中读取)本地并行度 = 1

发出 150K 类型 B 项目的S2源(从 DB 中批量读取 100 个分区的迭代器) 本地并行度 = 1

AD 处理器,适配 A->A1 和 B->B1 类型并一一发出

FA Processors.filterP 仅接受 A1 类型的项目并一一发出

FB Processors.filterP 仅接受 B1 类型的项目并一一发出

CL 处理器首先累积所有 A1 类型的项目,然后当它收到 B1 类型的项目时,用从适当的 A1 获得的一些人员来丰富它,然后一个接一个地发出。

写入 B1 的WR Sink 本地并行度 = 1

注意: 只是为了给过滤器处理器赋予意义:在 DAG 中有其他源流入同一个适配器 AD,然后使用过滤器处理器进入其他路径。

边缘

S1 --> 广告

S2 --> 广告

AD --> FA (从序数 0)

AD --> FB (从序数 1)

FA --> CL(到序号 0,优先级 0 分发和广播)

FB --> CL(到序号 1,优先级为 1)

CL --> 写

问题

如果源 S2 有“少量”要加载的项目(即 15K),则 emitFromTraverser 永远不会返回 false。

如果源 S2 有“许多”项要加载(即 150K),则 emitFromTraverser 在以下情况下返回 false:

  • 所有 A1 项目已由 CL 处理
  • 大约 30% 的 B1 项目已经传输到 CL,但没有一个被 CL 处理(DiagnosticProcessor 记录该元素已发送到 CL 但未处理)

S2代码供参考:

protected void init(Context context) throws Exception {
    super.init(context);
    this.iterator = new BQueryIterator(querySupplier, batchSize);
    this.traverser = Traversers.traverseIterator(this.iterator);
}

public boolean complete() {
    boolean result = emitFromTraverser(this.traverser);
    return result;
}

问题

  • CL 在源代码结束之前不处理项目是否正确?
  • 在 CL Vertex 上使用优先级 + 分布式 + 广播是否正确?

更新

似乎从未调用过 CL 边缘 1 上的 completeEdge。有人可以告诉我为什么吗?

谢谢!

4

2 回答 2

2

您遭受由优先级引起的死锁。您的 DAG 从 AD 分支,然后在 CL 中重新加入,但具有优先级。

AD --+---- FA ----+-- CL
      \          /
       +-- FB --+

设置优先级会导致在处理来自高优先级边缘的所有项目之前,不处理来自低优先级边缘的项目。AD最终会被来自低优先级路径的背压阻塞,而CL. SoAD被阻塞,因为它不能发送到较低优先级的边缘,而 CL 被阻塞,因为它仍在等待来自较高优先级边缘的项目,从而导致死锁。

在您的情况下,您可以通过制作 2 个顶点来解决它AD,每个顶点都处理来自以下来源之一的项目:

S1 --- AD1 ----+--- CL
              /
S2 --- AD2 --+
于 2018-09-28T17:41:26.533 回答
0

过了一会儿,我明白了问题所在...

CL 处理器无法知道所有 A1 项目何时已被处理,因为所有项目都来自 AD 处理器。所以在开始处理 B1 项目之前,它需要等待所有来自 AD 的源。

不确定,但可能在加载大量项目 B 后,DAG 中的所有收件箱缓冲区都已满,无法接受来自 S2 的任何其他 B,但同时无法处理 B1 项目以继续:这就是死锁。

也许 DAG 能够检测到这一点?我对 Jet 的了解并不深,但如果有这个警告就好了。

也许有一些日志记录可以启用?

我希望有人可以确认我的答案并建议如何改进和检测这些问题。

于 2018-09-28T12:46:59.693 回答