现在,我正在尝试实现基于队列工作者的设计,我们在队列中接收数百万条消息。而且工人是有限的,所以我使用以下代码将工作分配给工人。我正在使用 ExecutorService 来做同样的事情:
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
while(LISTEN_FOR_MESSAGE_FLAG == true){
Message receivedMessage = sqsClient.receiveMessage(request);
if(receivedMessage == null){
Thread.sleep(5000); // sleep for 5 seconds
}
else {
// lock the message for a certain amount of time (60 secs).
// Other workers can't receive a message, when it is locked.
sqsClient.changeMessageVisibility(receivedMessage, 60);
pool.execute(new Task(receivedMessage); // process the message.
}
}
我目前正在将 Amazon SQS 用于队列。上面的代码有严重的问题。它每 5 秒接收一次消息,并使用可见性超时锁定它们。一旦可见性超时消失,这个锁就会被打破。这会导致工作人员持有不再锁定的消息。因此,存在重复处理的问题。注意:Amazon SQS 提供了一种延长可见性超时的方法。
请帮助,如何编写上述代码来处理这种情况。