3

我目前正在使用子进程以非阻塞方式运行一些计算功能。

我遇到了一个问题,我需要能够将计算结果与父进程中的正确回调联系起来。例如:

var cp = require('child_process');
var n = cp.fork(__dirname + '/child.js');

exports.evaluate = function(data, callback) {

    n.send({ data : data});

    n.once('message', function(result) {

        callback(result);

    }

}

这个例子的问题是,如果一个计算在另一个之前返回,那么它将不会收到正确的结果。

有什么方法可以使用自定义事件名称而不是“消息”,以确保每次调用评估函数时都会创建一个唯一的侦听器,一旦调用该侦听器就会被删除?

如何从子进程发出自定义事件?

4

3 回答 3

7

使用 .创建一个新的 EventEmitter new events.EventEmitter。然后根据eventType:"someType"您收到的任何消息发出一个事件。

即类似的东西

 events = require("events")     
 emitter = new events.EventEmitter
 n.on("message", function(msg) { emitter.emit(msg.eventType,msg.body) })

然后将侦听器(使用onceor on)绑定到该事件发射器。

于 2013-05-12T16:57:10.973 回答
0

我认为一个不错的方法可能是递归调用您的评估函数,直到所有计算完成。使用队列进行计算并以 FIFO 方式处理它们。

var computations = [];

您需要放弃 n.once('message' 并在评估函数之外定义一个 n.on('message' 处理程序。您收到的每个“消息”都与您的评估数据类型匹配(我们称之为“结果”) . 然后您将该结果保存在结果的 FIFO 队列中。之后,您将检查是否有更多计算,然后再次调用评估。如果您可以清理这以拼接出评估函数中最旧的计算会的。

var results = [];

n.on('message', function(m) {
  if(m.msg === "result") { // message looks like {msg: "result", data: 1234}
    results.push(m.data);
  }
  if(computations.length > 0) {
    var comp = computations[0]; // Save oldest computation
    computations.splice(0,1); // Remove oldest computation from array
    evaluate(comp); // Evaluate oldest computation
  }
  else
    process_results(); // if you have 0 computations left, 
                       // you got the last result so go process them.
});

在调用评估之前,您应该将计算推入队列,并发送最旧的进行评估。

if(computations.length > 0) {
  computations.push(new_comp); // Push on your newest compuation
  var comp = computations[0]; // Save oldest computation
  computations.splice(0,1); // Remove oldest computation from array
  evaluate(comp); // Evaluate oldest computation
}
else {
  evaluate(new_comp); // Evaluate computation
}
evaluate(new_comp); // Evaluate oldest computation

您的“DONE”标志将在 computes.length === 0 时出现。这自然会使用上面的代码自行处理。任何时候你在 n.on('message', handler.. 中剩下 0 个计算,你都可以调用你的结果处理函数:

要处理您的结果,它是一个简单的 for 循环。

function process_results() {
    if(results.length > 0)
      for(x in results)
        console.log("New result: "+x);
}
于 2013-05-12T23:14:15.410 回答
0

这可能不是严格“正确”的,并且为时已晚 6 年以上,但似乎在 2020 年有效

> process.version
'v12.14.1'
> var ignore = process.on('custom',function(e){console.dir(e,{depth:null});})
undefined
> process.emit('custom',{e:"hello"})
{ e: 'hello' }
true
> 

即,任何传递给 a 的对象process.emit(x,obj)都会发送到针对匹配process.on(x,function(obj){ /* ... */ })回调注册的任何对象

于 2020-01-22T19:49:20.700 回答