1

是否有一种通用模式用于使 0mq pub/sub 中的发布者在节点中冗余?其动机是能够与可能会失败/定期重新启动的发布者一起运行多个进程。

我最初的想法是在主服务器中创建一个转发器并从工作发布者连接到它:

var cluster = require('cluster')
  , zmq = require('zmq')
  , endpointIn = 'ipc:///tmp/cluster_pub_sub'
  , endpointOut = 'tcp://127.0.0.1:7777';

if (cluster.isMaster) {
  for (var i = 0; i < 2; i++) cluster.fork();
  startPubSubForwarder();
} else {
  startPublisher();
}

function startPublisher() {
  var socket = zmq.socket('pub');
  socket.connect(endpointIn);
  setInterval(function () {
    socket.send('pid=' + process.pid);
  }, 1000);
}

function startPubSubForwarder() {
  var sIn = zmq.socket('sub')
    , sOut = zmq.socket('pub');

  // incoming
  sIn.subscribe('');
  sIn.bind(endpointIn, function (err) {
    if (err) throw err;
  });
  sIn.on('message', function (data) {
    sOut.send(data);
  });

  // outgoing
  sOut.bind(endpointOut, function (err) {
    if (err) throw err;
  });
}

还有其他/更好的方法吗?

4

2 回答 2

0

如果您关心的是消息的持久性,我认为您不太关心拥有多个发布者,而是更关心确保在您的发布者死亡时消息不会丢失。您可以立即重新启动发布者,然后从中断的地方继续。您还需要知道哪些消息已成功发送。

这需要的是 1) 持久存储和 2) 以及一种向发布者确认消息已在接收者端收到(并且可能处理已完成)的方法。此设置应该可以解决您的可靠性需求。

如果您还想实现高规模,那么您需要稍微扩充一下架构。对于发送者和接收者为 1:1 的发送/接收场景,它更直接,而当您需要执行 1:N 循环/负载分配场景时,这可能更复杂一些,这可能是您需要的规模。

我对完成横向扩展方案的投入是进行以下设置:

sender_process--(1:1)-->distributor--(1:N)-->receiver_process(es)

分发者确认收到来自发送者的消息,然后扇出到接收者进程。

您可能希望通过在每个进程前面放置一个队列来完成此操作。因此,您不要发送到流程。您发送到进程从中读取的队列。发件人将东西放在分发者队列中。分发者将东西放在接收者的队列中。在每个点上,每个进程都会尝试处理。如果它在最大重试次数之后失败,它将进入错误队列。

我们使用 rabbitmq / amqp 来完成这一切。我已经开始开源我们用来进行 1:1 和 1:N 发送的总线:https ://github.com/mateodelnorte/servicebus 。在接下来的几天里,我将添加自述文件和更多测试。

于 2012-06-28T18:56:11.873 回答
0

从您的示例代码中,我认为 XPUB/XSUB 0MQ 模式最适合您。这是实现相同“startPubSubForwarder()”块的一种更有效的方法,而且您的订阅者可以直接在发布者后端订阅某些模式。在这里,我留下了一个发布者/xpub-xsub(以代理方式)/订阅者示例的链接:https ://github.com/krakatoa/node_zmq_workshop/tree/master/03.3_news_proxy 。它是 NodeJS 代码(它是我的,不介意询问细节!)。

于 2014-05-09T03:30:20.777 回答