我一直在尝试设置呼吸机/工作人员/接收器模式以抓取页面,但我从未通过测试阶段。我的设置的一个特殊之处是水槽与呼吸机处于相同的过程中。所有节点都使用 ipc:// 传输。目前只交换测试消息。呼吸机发送任务,工作人员接收并等待,然后向接收器发送确认。
症状:一段时间后(通常少于 5 分钟),接收器停止接收确认消息,即使呼吸机继续发送任务并且工作人员继续接收它们并发送确认消息。
我知道发送了确认,因为如果我重新启动接收器,它会在启动时获取所有丢失的消息。
我认为 ZeroMQ 处理自动重新连接。
呼吸机/水槽
var push = zmq.socket('push');
var sink = zmq.socket('pull');
var pi = 0;
setInterval(function() {
push.send(['ping', pi++], zmq.ZMQ_SNDMORE);
push.send('end');
}, 2000);
push.bind('ipc://crawl.ipc');
sink.bind('ipc://crawl-sink.ipc');
sink.on('message', function() {
var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
console.log('got message', args.join(' '));
});
worker.js
var pull = zmq.socket('pull');
var sink = zmq.socket('push');
sink.connect(opt.sink);
pull.connect(opt.push);
pull.on('message', function() {
var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
console.log('got job ', args.join(' '));
setTimeout(function() {
console.log('job done ', args.join(' '));
sink.send(['job done', args.join(' ')]);
}, Math.random() * 5 * 1000);
});
编辑我尝试将接收器移动到另一个进程,它似乎工作。但是,我真的希望它生活在同一个进程中,并且在每个进程处理多个 zmq 套接字时,我观察到了类似的行为,而不管使用的模式如何
编辑我正在使用这个模块https://github.com/JustinTulloss/zeromq.node