29

我知道 SQS 不是为此而构建的,但我很好奇是否可以在队列中找到符合某些标准的消息?

我可以循环拉取消息,在消息正文中搜索某种模式(甚至无需反序列化它们),然后过滤我需要的消息。但随后可能会以无限循环结束——当我到达队列末尾时,我读到的第一条消息将返回到队列中......

可以扩展消息的可见性,但是我如何确切知道扫描整个队列需要多长时间,以及应该扩展可见性多长时间?如果我在那里确实有一万条消息怎么办?

这里有什么解决方法吗?我需要扫描队列中的一些消息,然后删除那些...

4

11 回答 11

27

简短的回答:没有。

队列是为诸如任务之类的事情而设计的。机器从队列中抓取一个新任务(即消息),执行该任务,然后删除该任务。

如果您尝试搜索消息以过滤它们,我不禁想知道您是否使用了错误的工具来完成这项工作……</p>

于 2012-10-05T08:33:20.370 回答
6

我不认为简短或冗长的答案是“否”。

这是两个“是”的对位解决方案。

  1. 遍历Queue,维护访问列表
  2. 使用企业集成模式(消息路由)根据标准将消息拆分为下游

1.遍历Queue,维护访问列表

考虑一个有N消息的队列的情况,没有消息被添加或删除。如果没有其他信息(例如,如果您知道有多少消息应该符合您的条件),您需要遍历所有N消息。

这里的关键点是知道您何时遍历了所有N消息。这里有一些问题。

  1. 要准确了解,您必须在将消息添加到队列时对其进行跟踪
  2. 大致了解一下,可以得到ApproximateNumberOfMessages队列的属性
  3. 或者你可以在一个循环中接收消息,维护一个访问列表,并假设你最终会从你的队列被分片的每个服务器中采样和耗尽消息

为了维护访问列表,当您接收消息并评估您的匹配标准时,您可以存储message_id所有访问消息的列表。

消息 ID几乎是唯一的。看到这个线程

https://forums.aws.amazon.com/message.jspa?messageID=76119

如果您选择 (3),您将无法确定需要多少次迭代才能耗尽队列。但是,如果您无限期地执行此操作,则只要 SQS 分片服务器上的加权随机分布为它们提供所有非零概率,就可以保证使队列充满。

2. 使用企业集成模式(消息路由)根据标准将消息拆分为下游

如果您可以控制您的消息传递体系结构,您可以使用消息路由器作为“前端”消息处理器,根据标准将消息分派给不同的接收者。

具体来说,您将使用基于内容的路由器:

http://www.enterpriseintegrationpatterns.com/patterns/messaging/ContentBasedRouter.html

于 2017-04-04T06:39:29.623 回答
3

我们有类似的要求,最终得到了本“动手”教程中描述的架构:过滤发布到主题的消息

本质上,不是将事件/消息发布到 SQS 队列,而是将它们发布到 SNS 主题,每个消费者将拥有自己的订阅该主题的 SQS 队列。然后,您可以使用SNS 订阅过滤器来确保只有相关消息排入每个消费者的队列。

这会产生额外的基础设施开销,但它作为我们的解决方案效果很好。

于 2020-04-06T17:00:11.313 回答
1

即使在请求特定属性时,对于不包含该属性的消息,该值只会设置为 null,您仍然可以使用某种方式进行过滤。那些没有按照您想要的方式设置属性的人可以将其可见性设置为 1 然后释放,因此它们将保留在队列中。将提供一种粗略的方式来进行优先级排队,尽管您可以根据消息内容轻松地做同样的事情。

于 2016-08-15T17:22:13.287 回答
1

用不同的案例进行了测试。这没用。答案是不

测试数据

public void fillQueueWithMessages(){

  MessageAttributeValue value1 = new MessageAttributeValue();
  value1.setDataType("String");
  value1.setStringValue("1");

  SendMessageRequest send_msg_request = new SendMessageRequest()
      .withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
      .withMessageBody("test1").addMessageAttributesEntry(value1.getStringValue(), value1);
  amazonSqs.sendMessage(send_msg_request);


  MessageAttributeValue value2 = new MessageAttributeValue();
  value2.setDataType("String");
  value2.setStringValue("2");


  SendMessageRequest send_msg_request2 = new SendMessageRequest()
      .withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
      .withMessageBody("test2").addMessageAttributesEntry(value2.getStringValue(), value2);
  amazonSqs.sendMessage(send_msg_request2);

  SendMessageRequest send_msg_request3 = new SendMessageRequest()
      .withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
      .withMessageBody("test3").addMessageAttributesEntry(value1.getStringValue(), value1);
  amazonSqs.sendMessage(send_msg_request3);

}

测试

public void shouldPollMessagesBasedOnMessageAttribute() throws InterruptedException {

ReceiveMessageRequest request =
    new ReceiveMessageRequest(env.getProperty("cloud.aws.sqs.readyForTranslation.url"));
request.setMaxNumberOfMessages(3);
request.setWaitTimeSeconds(20);
request.withMessageAttributeNames("1");

List<Message> messages = new ArrayList<Message>();

messages = amazonSqs.receiveMessage(request).getMessages();

assertEquals(2, messages.size());
}
于 2017-11-03T14:40:13.783 回答
1

对于完全控制一切的谦逊的开发工程师:

(1) 快速关闭消费者,因此消息被捕获到队列中。

(2) 关闭源。

(3) 读取所有 SQS 队列以查找您的消息,同时复制到“临时”队列。

(4) 将所有“临时”队列复制回 SQS 队列。谷歌它有很多工具。

(5)重启源和消费者。

如果您事先想到,另一种方法是使用 SNS 或其他东西复制到辅助“devops”队列,并在需要查找消息时通读该队列。您可以将“devops”队列的保留期设置得较短,以使其大小保持合理。

于 2019-12-12T15:58:09.283 回答
0

好的,这个问题永远是老生常谈了,doron-BGU bgu'声称属性如他所描述的那样工作也是如此。

我有几个“生产者”将相同的 JSON 发送到我的 SQS 中,我的“消费者”必须根据它们的来源、移动客户端、MVC 客户端或其他内部桌面应用程序与他们共舞。

我真的很想测试doron-BGU bgu的理论真的很糟糕。但为了务实,我的单个消费者对来自队列的这些消息执行明显不同的处理,只是检查每个消费者在 JSON 中的一个值,我强迫生产者填写定义消息源的值。苹果在这里,橘子在那边。

于 2020-02-29T20:57:30.790 回答
0

老话题,但可能会有所帮助您可以将 FIFO 与 GroupId 一起用于 帮助我的消息线程的小列表

于 2020-06-04T07:33:25.330 回答
0

如果您有一个相当小的消息过滤器列表,您可以为每个模式创建一个单独的队列。

例如,如果您的过滤对象是“颜色”,并且您知道值只有红色、蓝色和绿色,您可以为每种颜色创建一个队列。这样你就不需要过滤——只需从适当的队列中提取。显然,如果可以有 1000 种颜色,那么这种方法是不切实际的。

于 2021-10-13T15:42:25.473 回答
-1

让我们通过一些示例来理解这一点,因此创建 10 条消息并发送它

// Send a message
for (int i = 0; i < 10; i++) {
    System.out.println("Sending a message to MyQueue.\n");
    Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
    // extra code

    String sdate;
    Format formatter;
    Date date = new Date();

    // 2012-12-01
    formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
    sdate = formatter.format(date);
    System.out.println(sdate);

    messageAttributes.put("Datestamp"+i, new MessageAttributeValue().withDataType("String").withStringValue(sdate));

    Map<String, MessageAttributeValue> messageAttributes1 = new HashMap<>();
    messageAttributes1.put("attributeName", new MessageAttributeValue().withDataType("String").withStringValue(sdate));
    SendMessageRequest request = new SendMessageRequest();
    request.withMessageBody("A test message body."+sdate);
    request.withQueueUrl(myQueueUrl);
    request.withMessageAttributes(messageAttributes);
    sqs.sendMessage(request);
}

现在,即使您有 10 条带有 datetimestamp1 到 datetimestamp10
属性的消息过滤也不起作用

让我们尝试使用一些 myTag 属性进行过滤

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);

//ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(queueUrl);

receiveMessageRequest.withMaxNumberOfMessages(10);
receiveMessageRequest.withMessageAttributeNames("myTag");
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();

它提供 10 条消息,并且 myTag 值为 null

message.getMessageAttributes().get("Datestamp") 为空 message.getMessageAttributes().get("myTag") 为空

因此,我们无法使用消息属性进行过滤,就好像找不到该键一样。没有消息属性或所有消息属性相同。

这么长的答案是NOOOOO

于 2016-05-24T03:45:29.083 回答
-6

这其实不全是真的,

实际上,您可以使用消息属性技巧“有点”过滤队列中的消息。

每条消息都可以包含您可以在创建消息时添加的属性(您需要为每个属性提供 3 项内容:名称、类型、值)。

稍后,当您创建新的 ReceiveMessageRequest 对象时,您可以使用“withMessageAttributeNames”来指定一个属性,实际发生的是您的队列被过滤为包含该特定属性的消息。

例如:

String queueUrl = sqs.getQueueUrl("myQueue").getQueueUrl();

ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(queueUrl);

receiveRequest.withMaxNumberOfMessages(10);
receiveRequest.withMessageAttributeNames("myTag");

如果您的队列包含 5 条消息,但只有 1 条具有“myTag”属性,则只会返回该特定消息。

这对我来说是压倒性的,因为这在ReceiveMessageRequest API中没有提到

所以基本上你所要做的就是给每条消息一个唯一的属性(请注意属性限制:The message attribute name can contain the following characters: A-Z, a-z, 0-9, underscore (_), hyphen (-), and period (.). The name must not start or end with a period, and it should not have successive periods. The name is case sensitive and must be unique among all attribute names for the message. The name can be up to 256 characters long. The name cannot start with "AWS." or "Amazon." (or any variations in casing)

于 2016-04-02T21:24:53.210 回答