9

我是使用队列的新手。我能够成功地发布消息并同步接收它们但是,我现在正在尝试异步。

sqs 提供的参考链接建议使用 jmsclient 包装器。如果您已经有一个集成到 jms 客户端的代码,该链接还提到使用它。

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/jmsclient.html#samples

但是我重新开始,我参考了这个例子来同步发送和接收消息。

https://github.com/aws/aws-sdk-java/blob/master/src/samples/AmazonSimpleQueueService/SimpleQueueServiceSample.java

我可以使用相同的代码但使用消息侦听器来实现它吗?任何代码示例将不胜感激。

4

5 回答 5

9

Amazon SQS 开发人员指南的有关将 JMS 与 Amazon SQS结合使用的部分中有一个代码示例,它显示了如何使用 JMS 异步接收消息。

首先你实现 MessageListener 接口:

class MyListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            // Cast the received message as TextMessage and print the text to screen.
            if (message != null) {
                System.out.println("Received: " + ((TextMessage) message).getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

然后将其设置为 MessageConsumer 的 MessageListener:

// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);

// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());

// Start receiving incoming messages.
connection.start();

// Wait for 1 second. The listener onMessage() method will be invoked when a message is received.
Thread.sleep(1000);
于 2016-11-02T23:17:44.967 回答
4

您可以使用sqslistenerSpringCloud 框架中的注释。如果您正在开发应用程序Spring并且AWS您没有使用Spring Cloud,那么现在是您切换的好时机。

这是使用sqslistener注释从 SQS 异步接收消息的示例代码。一件好事是你必须几乎零配置才能使用它:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
import com.example.my.RecoverableException;

@Component
@Slf4j
public class CustomMessageQueue {

    @SqsListener(value = "${build_request_queue.name}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void receive(String message) {
        try {
            // write message processing here
        } catch (RecoverableException e) {
            // handle errors here for which message from queue should not be deleted
            // throwing an exception will make receive method fail and hence message does not get deleted
            throw e;

        } catch (Exception e) {
            // suppress exceptions for which message should be deleted.
        }
    }
}

sqslistener注释的伟大之处在于它的deletionPolicy. 因此,您可以决定何时删除来自 SQS 的消息。

于 2018-11-23T18:48:07.967 回答
0

我使用 aws sdk 连接到 SQS

AWSCredentials credential = new BasicAWSCredentials(accessKey, secretKey);
AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(credential);
AmazonSQS sqs = AmazonSQSClientBuilder.standard().withRegion(Regions.US_EAST_1).withCredentials(
            awsCredentialsProvider).build();

ReceiveMessageRequest messageRequest = new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(10)
            .withMaxNumberOfMessages(10);;

ReceiveMessageResult queueResult = sqs.receiveMessage(messageRequest);
    List<Message> messages = offerchangenotificationQue.getMessages();
    for (Message message : messages) {
        log.info("Receive message from queue {}", message.getBody());
    }
于 2020-05-26T00:02:24.503 回答
0

虽然这是一个老问题,但我将添加另一个我在我的 Python 应用程序中使用的解决方案。它可能会让您知道如何用任何语言进行操作。

我有一种由 SQS 触发的代理功能(侦听器)。在这个函数中,我有循环消息并通过执行 lambda client的调用方法来调用另一个 Lambda 函数的 for 循环。每条消息都“知道”应该调用的函数名称。

我将此代理功能用于应用程序中的所有其他功能,并由 SQS 触发。

这样我就可以利用最近添加的 Lambda 目的地功能。

于 2020-02-10T18:11:28.560 回答
-1

SQS 代表“简单队列服务”。从字面上看,它的意思很简单。因此,它不支持 JMS 的某些细节,尤其是异步侦听器。

我写了一篇关于这个主题的博客文章:http: //thedulinreport.com/2015/05/09/guaranteeing-delivery-of-messages-with-aws-sqs/

基本上,您需要做的是在无限循环中编写一个轮询器,但您希望对此保持精明——您不想因为每次请求都被收费而继续进行太多轮询。

于 2015-08-26T11:47:54.833 回答