16

我想有选择地从 AMQP 队列中删除消息,甚至不阅读它们。

场景如下:

发送方希望基于 X 类型的新信息到达的事实使 X 类型的消息过期。因为订阅者很可能还没有消费 X 类型的最新消息,所以发布者应该删除以前的 X 类型消息并将最新的消息放入队列中。整个操作对订阅者应该是透明的——事实上他应该使用像 STOMP 这样简单的东西来获取消息。

如何使用 AMQP 做到这一点?或者也许在另一个消息传递协议中更方便?

我想避免复杂的基础设施。所需的整个消息传递就像上面一样简单:一个队列、一个订阅者、一个发布者,但发布者必须具有根据给定条件临时删除消息的能力。

发布者客户端将使用 Ruby,但实际上,一旦我发现如何在协议中执行它,我就会处理任何语言。

4

4 回答 4

10

你不想要一个消息队列,你想要一个键值数据库。例如,您可以使用 Redis 或 Tokyo Tyrant 来获得一个简单的网络可访问键值数据库。或者只使用内存缓存。

每个消息类型都是一个键。当您使用相同的密钥编写新消息时,它会覆盖以前的值,因此该数据库的读者将永远无法获得过时的信息。

此时,您只需要一个消息队列来确定读取密钥的顺序,如果这很重要的话。否则,只需不断扫描数据库即可。如果您确实不断地扫描数据库,最好将数据库放在阅读器附近以减少网络流量。

我可能会做这样的事情 key: typecode value: lastUpdated, important data

然后我会发送包含 typecode, lastUpdated这样的消息,读者可以将该密钥的 lastupdated 与他们上次从数据库中读取的密钥进行比较并跳过阅读它,因为它们已经是最新的。

如果您确实需要使用 AMQP 执行此操作,请使用 RabbitMQ 和自定义交换类型,特别是 Last Value Cache Exchange。示例代码在这里https://github.com/squaremo/rabbitmq-lvc-plugin

于 2011-06-01T04:30:00.487 回答
8

您目前无法在 RabbitMQ(或更一般地说,在 AMQP)中自动执行此操作。但是,这是一个简单的解决方法。

假设您要发送三种类型的消息:Xs、Ys 和 Zs。如果我正确理解您的问题,当 X 消息到达时,您希望代理忘记所有其他尚未传递的 X 消息。

这在 RabbitMQ 中很容易做到:

  • 生产者声明了三个队列:X、Y 和 Z(它们自动绑定到默认交换器,其名称作为路由键,这正是我们想要的),
  • 发布消息时,生产者首先清除相关队列(因此​​,如果它正在发布 X 消息,它首先清除 X 队列);这有效地删除了过时的消息,
  • 消费者只需从它想要的队列中消费(X 表示 X 条消息,Y 表示 Y 条消息等);从它的角度来看,它只需要执行 basic.get 即可获得下一条相关消息。

当两个生产者几乎同时发送相同类型的消息时,这意味着竞争条件。结果是一个队列可能同时有两个(或更多)消息,但是因为消息的数量是生产者数量的上限,并且因为多余的消息在下一次发布时被清除,这应该不是什么大问题。

总而言之,这个解决方案比最优解决方案多了一个步骤,即在发布类型 X 的消息之前清除队列 X。

如果您在设置此配置时需要任何帮助,寻求建议的最佳地点是 rabbitmq-discuss 邮件列表。

于 2010-08-08T22:06:26.460 回答
4

如果您只想从队列中删除前 n 条消息,它似乎也适用于 RabbitMQ Web-UI

  • 从选项卡“队列”中选择队列,向下滚动到“获取消息”部分
  • 设置参数“Requeue=No”和要从队列中删除的消息数
  • 按“获取消息”按钮
于 2016-04-22T08:12:11.367 回答
3

这个问题由于它的标题而具有很高的知名度。通过描述更具体的场景。因此,对于那些希望从队列中实际删除下一条(记住 FIFO)消息的用户,您可以使用 rabbitmqadmin 并发出以下命令:

rabbitmqadmin get queue=queuename requeue=false count=1

这个命令本质上是在消费消息并且什么都不做。带有标志以备份消息的完整命令可能如下所示。确保根据您的要求添加任何其他参数。

sudo python rabbitmqadmin -V virtualhostname -u user -p pass get queue=queuename requeue=false count=1 payload_file=~/origmsg

于 2015-11-23T15:06:58.780 回答