0

我正在实现com.google.apphosting.api.ApiProxy.Delegate 接口,以便在我的 GAE/J 应用程序上代理 memcache API 调用。我的代理代码已执行,即我正在拦截 memcache API 调用。

public class CustomDelegate implements Delegate {
    // ...
    public byte[] makeSyncCall(
        Environment environment,
        String packageName,
        String methodName,
        byte[] request) throws ApiProxyException
    {
        return this.baseDelegate.makeSyncCall(environment, packageName, methodName, request);         
    }
    // ...
}

现在我想自己检查 memcache 的 get/set/increment 请求。将makeSyncCall()byte[] 请求参数反序列化为

  • com.google.appengine.api.memcache.MemcacheServicePb$MemcacheSetRequest
  • com.google.appengine.api.memcache.MemcacheServicePb$MemcacheGetRequest
  • com.google.appengine.api.memcache.MemcacheServicePb$MemcacheIncrementRequest

将允许我 * 检测热键 * 跟踪项目大小

如何将byte[] 请求传输到 Memcache*Request 实例?

4

1 回答 1

1

这是一些反序列化请求和响应的代码示例

import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheGetRequest;
import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheGetResponse;
import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheIncrementRequest;
import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheIncrementResponse;
import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheSetRequest;
import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheSetResponse;
...

  private static void examineRequest(String pkg, String method, byte[] request) {
    try {
      if (pkg.equals("memcache")) {
        if (method.equals("Get")) {
          MemcacheGetRequest req = MemcacheGetRequest.parseFrom(request);
          for (Iterable<Byte> key : req.getKeyList()) {
            ... key ...
          }
        } else if (method.equals("Set")) {
          MemcacheSetRequest req = MemcacheSetRequest.parseFrom(request);
          for (MemcacheSetRequest.Item item : req.getItemList()) {
            ... item.getKey() ... item.getValue() ...
          }
        } else if (method.equals("Increment")) {
          MemcacheIncrementRequest req = MemcacheIncrementRequest.parseFrom(request);
          ... req.getKey()... req.getDelta()... req.getInitialValue() ... 
        }
      }
    } catch (IOException e) {
      ... 
    }
  }

  private static void examineResponse(String pkg, String method, byte[] response) {
    try {
      if (pkg.equals("memcache")) {
        if (method.equals("Get")) {
          MemcacheGetResponse resp = MemcacheGetResponse.parseFrom(response);
          for (MemcacheGetResponse.Item item : resp.getItemList()) {
            ... item.getKey() ... item.getValue() ...
          });

        } else if (method.equals("Set")) {
          MemcacheSetResponse resp = MemcacheSetResponse.parseFrom(response);
          ... resp.getSetStatusList() ... 

        } else if (method.equals("Increment")) {
          MemcacheIncrementResponse resp = MemcacheIncrementResponse.parseFrom(response);
          ... resp.getNewValue() ... 

        }
      }
    } catch (IOException e) {
      ... 
    }

  }

从 中调用上述makeSyncCall内容很简单,但从makeAsyncCall方法中调用有点棘手,因为您希望推迟检查响应,直到 Future 完成之后。一种方法是包装 Future,如下所示

public class ApiProxyHook<E extends Environment> implements Delegate<E> {
  private Delegate<E> baseDelegate;
  ... 

  @Override
  public byte[] makeSyncCall(E environment, String pkg, String method, byte[] request)
      throws ApiProxy.ApiProxyException {
    byte[] response = this.baseDelegate.makeSyncCall(environment, pkg, method, request);

    examineRequest(pkg, method, request);
    examineResponse(pkg, method, response);

    return response;
  }
  ... 

  @Override
  public Future<byte[]> makeAsyncCall(E env, final String pkg, final String method, byte[] request, ApiConfig apiConfig) {

    examineRequest(pkg, method, request);

    Future<byte[]> innerFuture = baseDelegate.makeAsyncCall(env, pkg, method, request, apiConfig);
    return new HookedFuture<byte[]>(innerFuture) {
      @Override
      protected void futureResultIs(byte[] response) {

        examineResponse(pkg, method, response);

      }
    };

  }
  ... 
}

HookedFuture班级在哪里

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * A future that wraps another future, and behaves just like it, except that at the point
 * when the future completes a hook is called.
 *
 * @param <T> the type returned by the future.
 */
public abstract class HookedFuture<T> implements Future<T> {

  private final Future<T> innerFuture;

  /** Make sure hook is only called once */
  private boolean hookCalled = false;

  public HookedFuture(Future<T> future) {
    innerFuture = future;
  }

  /** Client implements this hook, which will be called back on completion of the first
   *   get() */
  protected abstract void futureResultIs(T result);

  @Override
  public boolean cancel(boolean mayInterruptIfRunning) {
    return innerFuture.cancel(mayInterruptIfRunning);
  }

  @Override
  public boolean isCancelled() {
    return innerFuture.isCancelled();
  }

  @Override
  public boolean isDone() {
    return innerFuture.isDone();
  }

  @Override
  public T get() throws InterruptedException, ExecutionException {
    T result = innerFuture.get(); // may block here
    if (!hookCalled) {
      futureResultIs(result);
      hookCalled = true;
    }
    return result;
  }

  @Override
  public T get(long timeout, TimeUnit unit)
      throws InterruptedException, ExecutionException, TimeoutException {
    T result = innerFuture.get(timeout, unit); // may block here
    if (!hookCalled) {
      futureResultIs(result);
      hookCalled = true;
    }
    return result;
  }
}
于 2013-07-16T20:15:15.900 回答