在 NoFlo 中,我经常遇到这样的组件:
noflo = require 'noflo'
class Foo extends noflo.AsyncComponent
constructor: ->
@inPorts = new noflo.InPorts
main:
datatype: 'int'
description: 'Main async input'
required: true
sup1:
datatype: 'string'
description: 'Supplementary input #1'
required: true
sup2:
datatype: 'int'
description: 'Supplementary input #2'
required: true
@outPorts = new noflo.OutPorts
out:
datatype: 'object'
description: 'Result object'
error:
datatype: 'object'
@sup1 = null
@sup2 = null
@inPorts.sup1.on 'data', (@sup1) =>
@inPorts.sup2.on 'data', (@sup2) =>
super 'main', 'out'
doAsync: (main, callback) ->
unless @sup1 and @sup2
return callback new Error "Supplementary data missing"
# Combine data received from different sources
result =
main: main
sup1: @sup1
sup2: @sup2
# Reset state until next iteration
@sup1 = null
@sup2 = null
# Send the result
@outPorts.out.send result
@outPorts.out.disconnect()
callback()
exports.getComponent = -> new Foo
它假设所有 3 个输入连接都以某种方式同步,尽管网络主要由异步组件组成。考虑这种情况:Foo
等待main
到来并接收sup1
数据sup2
包,但随后下一个sup1
数据包到达,应该与 next 结合main
,同时仍在等待上一个数据包main
的到来。结果将是更多或更高数据吞吐量的完全混乱。
NoFlo 异步组件是否有任何数据竞争保护手段,还是完全取决于组件设计者?
这里有两个问题:同步输入和维护内部状态。内部状态或多或少受到 Node.js 不是多线程的事实的保护,并且在前一个doAsync()
处理程序完成之前没有任何东西会尝试访问状态变量。但是同步输入仍然是一个问题。