0

现在,我正在尝试实现基于队列工作者的设计,我们在队列中接收数百万条消息。而且工人是有限的,所以我使用以下代码将工作分配给工人。我正在使用 ExecutorService 来做同样的事情:

    ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE); 

    while(LISTEN_FOR_MESSAGE_FLAG == true){
      Message receivedMessage = sqsClient.receiveMessage(request);

      if(receivedMessage == null){
        Thread.sleep(5000); // sleep for 5 seconds
      }
      else {
        // lock the message for a certain amount of time (60 secs). 
        // Other workers can't receive a message, when it is locked.
        sqsClient.changeMessageVisibility(receivedMessage, 60); 

        pool.execute(new Task(receivedMessage); // process the message.
      }
    }

我目前正在将 Amazon SQS 用于队列。上面的代码有严重的问题。它每 5 秒接收一次消息,并使用可见性超时锁定它们。一旦可见性超时消失,这个锁就会被打破。这会导致工作人员持有不再锁定的消息。因此,存在重复处理的问题。注意:Amazon SQS 提供了一种延长可见性超时的方法。

请帮助,如何编写上述代码来处理这种情况。

4

1 回答 1

1

如果你改变

pool.execute(new Task(receivedMessage);

至 :

Future<?> task = pool.submit(new Task(receivedMessage));

您可以执行一个辅助任务,在时间用完之前延长可见性时间。辅助任务基本上在 Future 上执行 get(),超时时间短于 60 秒锁定。当发生 TimeoutException 时,它会延长可见性超时,并再次开始等待 Future。

pool.execute(new Runnable() {
    @Override
    public void run() {
        boolean done = false;
        while (!done) {
            try {
                task.get(50, TimeUnit.SECONDS);
                done = true;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                done = true;
            } catch (ExecutionException e) {
                done = true;
                // handle e.getCause()
            } catch (TimeoutException e) {
                // we need more time
                sqsClient.changeMessageVisibility(receivedMessage, 60);
            }
        }

    }
});
于 2013-08-22T09:21:03.220 回答