5

I'm loading local Couchbase instance with application specific json objects.

Relevant code is:

CouchbaseClient getCouchbaseClient()
{
    List<URI> uris = new LinkedList<URI>();
    uris.add(URI.create("http://localhost:8091/pools"));
    CouchbaseConnectionFactoryBuilder cfb = new CouchbaseConnectionFactoryBuilder();
    cfb.setFailureMode(FailureMode.Retry);
    cfb.setMaxReconnectDelay(1500); // to enqueue an operation
    cfb.setOpTimeout(10000); // wait up to 10 seconds for an operation to succeed
    cfb.setOpQueueMaxBlockTime(5000); // wait up to 5 seconds when trying to
                                      // enqueue an operation
    return new CouchbaseClient(cfb.buildCouchbaseConnection(uris, "my-app-bucket", ""));
}

Method to store entry (I'm using suggestions from Bulk Load and Exponential Backoff):

  void continuosSet(CouchbaseClient cache, String key, int exp, Object value, int tries)
  {
    OperationFuture<Boolean> result = null;
    OperationStatus status = null;
    int backoffexp = 0;

    do
    {
      if (backoffexp > tries)
      {
        throw new RuntimeException(MessageFormat.format("Could not perform a set after {0} tries.", tries));
      }

      result = cache.set(key, exp, value);
      try
      {
        if (result.get())
        {
          break;
        }
        else
        {
          status = result.getStatus();
          LOG.warn(MessageFormat.format("Set failed with status \"{0}\" ... retrying.", status.getMessage()));
          if (backoffexp > 0)
          {
            double backoffMillis = Math.pow(2, backoffexp);
            backoffMillis = Math.min(1000, backoffMillis); // 1 sec max
            Thread.sleep((int) backoffMillis);
            LOG.warn("Backing off, tries so far: " + tries);
          }
          backoffexp++;
        }
      }
      catch (ExecutionException e)
      {
        LOG.error("ExecutionException while doing set: " + e.getMessage());
      }
      catch (InterruptedException e)
      {
        LOG.error("InterruptedException while doing set: " + e.getMessage());
      }
    }
    while (status != null && status.getMessage() != null && status.getMessage().indexOf("Temporary failure") > -1);
  }

When continuosSet method called for a large amount of objects to store (single thread) e.g.

CouchbaseClient cache = getCouchbaseClient();
do
{
    SerializableData data = queue.poll();
    if (data != null)
    {
        final String key = data.getClass().getSimpleName() + data.getId();
        continuosSet(cache, key, 0, gson.toJson(data, data.getClass()), 100);
...

it generates CheckedOperationTimeoutException inside of continuosSet method in result.get() operation.

Caused by: net.spy.memcached.internal.CheckedOperationTimeoutException: Timed out waiting for operation - failing node: 127.0.0.1/127.0.0.1:11210
        at net.spy.memcached.internal.OperationFuture.get(OperationFuture.java:160) ~[spymemcached-2.8.12.jar:2.8.12]
        at net.spy.memcached.internal.OperationFuture.get(OperationFuture.java:133) ~[spymemcached-2.8.12.jar:2.8.12]

Can someone shed light into this how to overcome and recover from this situation? Is there a good technique/workaround on how to bulk load in Java client for Couchbase? I already explored documentation Performing a Bulk Set which is unfortunately for PHP Couchbase client.

4

1 回答 1

5

我的怀疑是,您可能在从命令行生成的 JVM 中运行它,该 JVM 没有那么多内存。如果是这种情况,您可能会遇到更长的 GC 暂停,这可能会导致您提到的超时。

我认为最好的办法是尝试几件事。首先,向 JVM 提出 -Xmx 参数以使用更多内存。查看超时是稍后发生还是消失。如果是这样,那么我对记忆的怀疑是正确的。

如果这不起作用,请提高 setOpTimeout() 并查看是否可以减少错误或使其消失。

另外,请确保您使用的是最新的客户端。

顺便说一句,我认为这与批量加载没有直接关系。这可能是由于在批量加载过程中消耗了大量资源而发生的,但看起来常规退避必须有效,或者您从未遇到过它。

于 2014-02-08T18:04:03.050 回答