9

我一直在尝试设置呼吸机/工作人员/接收器模式以抓取页面,但我从未通过测试阶段。我的设置的一个特殊之处是水槽与呼吸机处于相同的过程中。所有节点都使用 ipc:// 传输。目前只交换测试消息。呼吸机发送任务,工作人员接收并等待,然后向接收器发送确认。

症状:一段时间后(通常少于 5 分钟),接收器停止接收确认消息,即使呼吸机继续发送任务并且工作人员继续接收它们并发送确认消息。

我知道发送了确认,因为如果我重新启动接收器,它会在启动时获取所有丢失的消息。

我认为 ZeroMQ 处理自动重新连接。

呼吸机/水槽

var push = zmq.socket('push');
var sink = zmq.socket('pull');
var pi = 0;
setInterval(function() {
    push.send(['ping', pi++], zmq.ZMQ_SNDMORE);
    push.send('end');
}, 2000);
push.bind('ipc://crawl.ipc');
sink.bind('ipc://crawl-sink.ipc');
sink.on('message', function() {
    var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
    console.log('got message', args.join(' '));
});

worker.js

var pull = zmq.socket('pull');
var sink = zmq.socket('push');
sink.connect(opt.sink);
pull.connect(opt.push);

pull.on('message', function() {
    var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
    console.log('got job ', args.join(' '));
    setTimeout(function() {
        console.log('job done ', args.join(' '));
        sink.send(['job done', args.join(' ')]);
    }, Math.random() * 5 * 1000);
});

编辑我尝试将接收器移动到另一个进程,它似乎工作。但是,我真的希望它生活在同一个进程中,并且在每个进程处理多个 zmq 套接字时,我观察到了类似的行为,而不管使用的模式如何

编辑我正在使用这个模块https://github.com/JustinTulloss/zeromq.node

4

2 回答 2

4

我不一定期望这个答案会被接受,但我将其放在这里以供参考。有一个非常忠实的仅节点模块,称为Axon,它的灵感来自 ZeroMQ。

  • Axon 没有编译依赖,并重新创建与 ZeroMQ 相同的套接字类型。
  • Axon 还基于 pub/sub 套接字类型来创建网络事件发射器。
  • 最后,ZMQ 的 req/rep 套接字不适用于 Node.js,因为 ZMQ 期望回复同步发生。作为原生节点,Axon 库可以正确处理 req/rep 模式。

注意: ZMQ 和 Axon 不可互操作。

于 2013-02-05T17:16:11.353 回答
2

不确定您使用的是基本 AMQP 客户端,还是使用它的包,我在使用 RabbitMQ 时遇到了类似的问题。实际进程仍在运行(setInterval 有效)

我正在通过主进程中的 cluster.fork 运行我的服务/工作人员......主进程中有监听器在退出时重新启动工作人员/服务......在我的工作人员内部,我有一个每 X 秒运行一次的 setInterval ,如果在那段时间没有完成任何工作,我有我的工作进程.exit(主进程侦听器将启动一个新的分叉)。这对我来说是足够的弹性。通过让几个工作人员运行(侦听队列),工作仍然可以完成。

正如另一位建议的那样,我一直在考虑切换到 Axon,因为我与 MQ 的所有接口目前都通过 Node.js 进行。我的其他系统通过 NodeJS 驱动的 API 服务进行交互。就此而言,通过 API 服务公开您可能需要的内容可能不会太难。

于 2013-03-05T20:46:53.343 回答