0

我正在尝试从单个进程(P1)获取输出并使用其他进程(P2 和 P3)对其执行并行任务。到目前为止这么简单。

为此,我将 P2 和 P3 连接到 P1 的单个输出端口。在我看来,这应该意味着 P1 通过其输出端口发出数据包,这些数据包由 P2 和 P3 同时并行接收。

我发现 P2 和 P3 不是并行启动的,而是其中一个进程将等到另一个进程完成处理(或者至少在我看来是这样)。

例如,这是一个简单的图表,它应该接受 JSON 输入,然后同时获取时间戳并解析 JSON。解析 JSON 后会获取另一个时间戳,这用作计算 JSON 解析所用时间的基本方法。

在此处输入图像描述

请注意从ajax/Get输出端口发出的连接的顺序(时间戳连接是最后添加的)。

在这种情况下,时间戳的差异约为 5 毫秒,这与 JSON 解析在非 NoFlo 环境中所花费的时间大致一致(由于某种原因,在 NoFlo 中实际上要长一点)。

现在使用相同的图表,但这次来自ajax/Get输出端口的连接顺序发生了变化(最后添加了解析连接):

在此处输入图像描述

这次时间戳之间的差异大约为 40-50 毫秒,这显然是一个巨大的差异,并且远远大于解析在 NoFlo 之外所采用的差异。

如果有人能对以下内容有所了解,我将不胜感激:

  • 为什么根据连接顺序,时间会如此不同?
  • 如何确保来自的 2 个连接ajax/Get被触发并并行运行(即它们不会相互等待)?

如果有帮助,这里是来自 FlowHub 的图形的 JSON 导出

我还使用 CLI 整理了一个简单的图表,并设法更好地了解图表的流程,并可能对可能导致这种情况的原因有所了解:

# This executes in the correct order, though likely by
# coincidence and not due to true parallelisation.
#
# Time1 is run and outputted before Time2.
#
Read(filesystem/ReadFile) OUT -> IN Time1(objects/GetCurrentTimestamp)
Read OUT -> IN Parse(strings/ParseJson)

# This executes the entire Parse path before going back to grab
# and output Time1.
#
# Time1 is run and outputted *after* Time2
# Read doesn't send a disconnect message to Parse until *after*
# Time 1 is outputted.
#
# Read doesn't send a disconnect message to Time1 until *after*
# the Parse path has finished disconnecting.
#
# Read(filesystem/ReadFile) OUT -> IN Parse(strings/ParseJson)
# Read OUT -> IN Time1(objects/GetCurrentTimestamp)

Time1 OUT -> IN Display1(core/Output)

Parse OUT -> IN Time2(objects/GetCurrentTimestamp)
Time2 OUT -> IN Display2(core/Output)

'sample.geojson' -> IN Read

当使用之前定义的ReadtoTime1连接运行时ReadParse网络是有序的,尽管我注意到Read在触发断开连接之前等到其他所有内容都完成(对吗?):

DATA -> ENCODING Read() CONN
DATA -> ENCODING Read() DATA
DATA -> ENCODING Read() DISC
DATA -> IN Read() CONN
DATA -> IN Read() DATA
DATA -> IN Read() DISC
Read() OUT -> IN Time1() CONN
Read() OUT -> IN Time1() < sample.geojson
Read() OUT -> IN Parse() CONN
Read() OUT -> IN Parse() < sample.geojson
Parse() OUT -> IN Time2() CONN
Parse() OUT -> IN Time2() < sample.geojson
Read() OUT -> IN Time1() DATA
Time1() OUT -> IN Display1() CONN
Time1() OUT -> IN Display1() DATA
1422549101639
Read() OUT -> IN Parse() DATA
Parse() OUT -> IN Time2() DATA
Time2() OUT -> IN Display2() CONN
Time2() OUT -> IN Display2() DATA
1422549101647
Read() OUT -> IN Time1() > sample.geojson
Read() OUT -> IN Parse() > sample.geojson
Parse() OUT -> IN Time2() > sample.geojson
Read() OUT -> IN Time1() DISC
Time1() OUT -> IN Display1() DISC
Read() OUT -> IN Parse() DISC
Parse() OUT -> IN Time2() DISC
Time2() OUT -> IN Display2() DISC

如果我切换顺序以便首先定义Readto连接,那么一切都会出错,甚至在整个路径完成之前都不会发送数据包(实际上是现在之后):ParseTime1ReadParseTime1 Time2

DATA -> ENCODING Read() CONN
DATA -> ENCODING Read() DATA
DATA -> ENCODING Read() DISC
DATA -> IN Read() CONN
DATA -> IN Read() DATA
DATA -> IN Read() DISC
Read() OUT -> IN Parse() CONN
Read() OUT -> IN Parse() < sample.geojson
Parse() OUT -> IN Time2() CONN
Parse() OUT -> IN Time2() < sample.geojson
Read() OUT -> IN Time1() CONN
Read() OUT -> IN Time1() < sample.geojson
Read() OUT -> IN Parse() DATA
Parse() OUT -> IN Time2() DATA
Time2() OUT -> IN Display2() CONN
Time2() OUT -> IN Display2() DATA
1422549406952
Read() OUT -> IN Time1() DATA
Time1() OUT -> IN Display1() CONN
Time1() OUT -> IN Display1() DATA
1422549406954
Read() OUT -> IN Parse() > sample.geojson
Parse() OUT -> IN Time2() > sample.geojson
Read() OUT -> IN Time1() > sample.geojson
Read() OUT -> IN Parse() DISC
Parse() OUT -> IN Time2() DISC
Time2() OUT -> IN Display2() DISC
Read() OUT -> IN Time1() DISC
Time1() OUT -> IN Display1() DISC

如果这是正确的行为,那么如何在不阻塞另一个分支的情况下并行运行 2 个分支?

我尝试过使每个组件异步,我已经尝试过这两种方法并使用 WirePattern,我尝试过创建多个输出端口并一次通过所有这些端口发送数据。没有乐趣——它总是归结为第一条边的连接顺序。我正在用它来拔头发,因为它完全阻止了我在 ViziCities 中使用 NoFlo :(

4

2 回答 2

0

由于 JavaScript 引擎的单线程特性,NoFlo 无法并行执行多项操作。I/O 调用在它们自己的线程中运行,但它们的回调总是将我们返回到运行 NoFlo 的主线程。

在 NoFlo 中,只要我们处理同步组件(就像ajax/Get您的示例图中的其他所有组件一样),我们就会执行深度优先以确保快速吞吐量。

这意味着从第一个出站连接的子流首先ajax/Get运行到完成,然后是第二个。

您希望在这里发生的是广度优先执行而不是深度优先。已经有一些关于通过边缘元数据启用它的讨论,但在此之前,一种方法是core/RepeatAsync在感兴趣的连接之间添加节点。

从长远来看,另一种有趣的方法是通过RemoteSubgraph和 Web Workers 在他们自己的线程中运行部分流。从理论上讲,我们甚至可以在自己的线程中运行每个进程,从而与经典 FBP 完全兼容。但这将付出高昂的启动成本。

于 2015-01-30T02:55:53.337 回答
0

我不会考虑浏览器示​​例,正如 bergie 解释的那样,由于浏览器端 JavaScript 的工作方式,它确实是深度优先的。

不过 CLI 示例更有趣,因为 noflo-nodejs 广泛使用 EventEmitters。它仍然不是真正的并行,但它更具并发性。

我们在这里看到的是以下情况的副作用:

  1. 事件按其发生的顺序进行处理。
  2. 图中定义分支的顺序会影响事件发生的顺序。
  3. 大多数组件是由data事件触发的,而不是disconnect. 他们不等待disconnect处理data并发送结果。

总的来说,它解释了为什么第一个分支在第二个分支之前执行,以及为什么在所有数据都已经处理之后断开连接。

这里可能会给你一个纯同步的印象,但是尽管上面列出了这些事实,系统仍然是并发的。如果Read以适当的速度发送多个数据包,您会看到分支 1 和分支 2 的事件混合在一起。

更新:

这是将同步组件转换为异步的常用技巧:

setTimeout(function() {
  doTheSyncJob(); // actual code here
  callback();
}, 0);
于 2015-01-30T11:55:13.227 回答