1

我正在研究一个要求,即一个进程(比如生产者)需要向可变数量的进程(比如消费者)发送单向消息。

发布-订阅模型似乎对此很好,因为消费者将订阅来自生产者的消息。我尝试使用ZeroMQ来实现这一点。

但是,我有一些问题:

  1. 消费者必须不断地轮询消息。当有新消息时,我会通知消费者。

  2. 生产者队列有可能被填满。我希望生产者根据某些条件从队列中删除消息(比如删除超过 5 秒的消息,或者删除已阅读 5 次的消息)。

  3. 由于消费者正在轮询并且消息没有从队列中删除,因此消费者会看到重复的消息,直到有新消息进来。我希望每条新消息只通知消费者一次。

我知道我可能使用了错误的模型(发布订阅可能不合适)。我曾考虑过使用 request-reply,但这不起作用,因为生产者不想跟踪消费者的数量。

任何人都可以提出一个好的选择吗?

4

4 回答 4

1

DDS(数据分发服务)中间件完全支持您想要实现的目标,而且更容易实现。

直接回答你的问题:

  1. DDS 支持监听机制,你的订阅者不需要不断的轮询。

  2. DDS 具有良好的 QoS 设置以防止发布者队列被填满。您可以使用 History QoS 表示“仅保留队列中最新的 10 个样本”,也可以使用 Lifespan QoS 表示“仅保留最近 10 秒内发布的样本”。

  3. 同样,您可以使用 DDS 侦听器机制,并且对于每个新样本,您只会收到一次通知。无需投票。

目前有两种开源实现。

于 2012-05-31T08:50:15.713 回答
1

我建议在 Producer 和 Consumer 之间使用 Broker 的 Push-Pull 模型。

  1. 任何新消息都应通知代理。
  2. 消费者将听取经纪人的通知(保留表格以跟踪成功/失败。因此在重试期间将避免重复)
  3. 一旦 #2 完成,消费者就可以从 Producer(source) 中提取数据并将 ack 发送给 broker 以获取成功/失败

希望这可以帮助

于 2012-05-28T07:45:53.427 回答
0

您需要不止一位制作人吗?如果不是,您可以使用 PUSH/PULL 而不是 PUB/SUB。

使用 PUSH/PULL,您可以拥有任意数量的消费者(他们是模型的 PULL 方面)。写入 PUSH 端点的所有消息都以循环方式分布在所有连接的消费者之间。这也确保了两个消费者不会收到相同的消息。

正如您所描述的,如果两个或多个消费者订阅了相同的“前缀”,那么将消费者作为 SUB 端点,您最终可以将相同的消息传递给多个消费者(假设这将是您的模型中的一个问题)。

假设“前缀”是您传递给的字符串sock.setsockopt(ZMQ_SUBSCRIBE, "prefix", ...);

于 2012-03-28T01:13:55.453 回答
0

尝试使用 JMS 提供程序或 AMQP 提供程序。这些有一些你正在寻找主题的东西:

  1. 向订阅者推送通知。

  2. 消息的生存时间属性,如果消息未在 TTL 内使用,则允许将消息删除或放入死信队列。

  3. 一次性通知 - 取决于您的配置。

请注意,在网络故障的情况下,一次性消息确实具有边缘条件,这可能导致消息丢失或重复消息……您可以选择。

就使用哪个提供者而言。RabbitMQ 在 AMQP 中很受欢迎。对于 JMS,有任意数量的专有产品或开源实现。

于 2012-04-10T10:23:32.247 回答