1

我有一个代码,我将数据发送到我们的队列,然后队列发送回确认,说他们已经收到数据,所以我等待 X 时间,然后检查他们是否收到数据。以下是执行此操作的代码并且可以正常工作:

  public boolean send(final long address, final byte[] records, final Socket socket) {
    boolean sent = sendAsync(address, records, socket, true);
    if (sent) {
      try {
        TimeUnit.MILLISECONDS.sleep(800);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
    // if key is not present, then acknowledgement was received successfully
    sent = !acknowledgementCache.asMap().containsKey(address);
    // and key is still present in the cache, then it means acknowledgment was not received after
    // waiting for timeout period, so we will remove it from cache.
    if (!sent)
      removeFromRetryBucket(address);
    return sent;
  }

800 milliseconds现在上面代码的问题是 - 我无论如何都在等待,这是错误的。确认可能在 100 毫秒内返回,但我仍然等待 800,所以我想在确认返回后立即返回,而不是等待 X 时间。

所以我想出了下面的代码,它使用等待,但由于某种原因它没有按预期工作。意思是,即使确认很快回来,它仍然超时。我也尝试将超时值增加到非常高的数字,但它仍然超时,所以看起来有些错误。有没有更好的方法来做到这一点?

  public boolean send(final long address, final byte[] records, final Socket socket) {
    boolean sent = sendAsync(address, records, socket, true);
    if (sent) {
      try {
        // if key is not present, then acknowledgement was received successfully
        Awaitility.await().atMost(800, TimeUnit.MILLISECONDS)
            .untilTrue(new AtomicBoolean(!acknowledgementCache.asMap().containsKey(address)));
        return true;
      } catch (ConditionTimeoutException ex) {
      }
    }
    // and key is still present in the cache, then it means acknowledgment was not received after
    // waiting for timeout period, so we will remove it from cache.
    removeFromRetryBucket(address);
    return false;
  }

注意:到目前为止,我正在使用 Java 7。我确实可以使用 Guava,所以如果除了等待之外还有什么更好的东西,那么我也可以使用它。

4

1 回答 1

2

为了能够在 Java 7 中检查,您需要编写一个可调用对象。

@Test
public void send() {
    //when
    boolean sent = sendAsync(address, records, socket, true);
    //then
    if (sent) {
        await().until(receivedPackageCount(), equalTo(false));
    }
}

private Callable receivedPackageCount(String address) {
    return new Callable() {
        @Override
        public boolean call() throws Exception {
            return acknowledgementCache.asMap().containsKey(address);
        }
    };
}

它必须与上面类似。可能会出现编译错误,因为我是在没有 ide 的情况下编写的。

于 2020-02-20T22:01:30.803 回答