2

我正在构建一个系统,其中两个不同的实体需要处理来自同一源的消息(以不同的方式 - 例如一个将记录所有消息,而另一个实体可能想要聚合数据)。

理想情况下,每个实体在性能和弹性方面都是完全可扩展的,因此我们有多个发布者、多个日志订阅者和多个聚合订阅者,但每个发布者生成的每条消息仍然只由一个日志订阅者和一个聚合订阅者处理。

使用 AMQP,我们可以通过将消息发布到扇出交换器来实现这一点,该交换器将消息分发到两个队列,每个队列都有许多订阅者。我知道在 NATS 中可以实现相同的行为,只需让所有订阅者根据其角色使用两个不同的“队列组名称”来监听相同的“主题”。

在这种情况下,发给主题的消息将被从每个队列组传递给一个订阅者,即每条消息将被准确地传递 n 次,n 是不同队列组的数量而不是订阅者的数量。这个对吗?

4

2 回答 2

4

事实上,您可以使用队列订阅者(例如在 Go 中,它会是这样的 API func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error):.

queue组名。例如,它可能在您的示例中logaggregation. 您可以在每个组上创建尽可能多的队列订阅者,并且每个组中只有 1 个成员会收到给定的消息。

例如,假设您发布了关于主题的消息,foo并且您有 10 个队列订阅者foo使用队列名称log和 10 个队列订阅者foo使用队列名称aggregation。消息将传递给 2 个订阅者,1 个用于 group log,1 个用于 group aggregation

希望这可以帮助。

于 2017-07-06T17:38:49.727 回答
0

您的方法是正确的,nats.io 中队列的概念是在侦听队列的订阅者之间顺序分发消息。这种分布以线性方式发生,假设您有 10 个订阅者(S1-S10)正在监听一个主题并在同一个队列上注册,那么第一条消息将被发送到 S1,然后以循环方式发送到 S2,依此类推。

您只需要确保所有订阅者都连接到服务器,就像订阅者下线一样,NATS 服务器将在某些未完成的 PING-PONG 请求后意识到此事件,并在此时间间隔内将消息转发到离线节点。因此需要仔细设置

  1. 乒乓间隔
  2. 最大未完成的 PING 请求

https://nats.io/documentation/server/gnatsd-config/

于 2017-10-05T05:44:22.877 回答