1

我正在开发一个库,它将一个对象DataRequest作为输入参数并基于该对象,我将构建一个 URL,然后使用 apache http 客户端调用我们的应用服务器,然后将响应返回给客户使用我们的图书馆。有些客户会调用该executeSync方法来获取相同的功能,有些客户会调用我们的executeAsync方法来获取数据。

  • executeSync()- 等到我有结果,返回结果。
  • executeAsync()- 立即返回一个 Future,如果需要,可以在其他事情完成后处理它。

下面是我的DataClient类,它有以上两种方法:

public class DataClient implements Client {
  private final ForkJoinPool forkJoinPool = new ForkJoinPool(16);
  private CloseableHttpClient httpClientBuilder;

  // initializing httpclient only once
  public DataClient() {
    try {
      RequestConfig requestConfig =
          RequestConfig.custom().setConnectionRequestTimeout(500).setConnectTimeout(500)
              .setSocketTimeout(500).setStaleConnectionCheckEnabled(false).build();
      SocketConfig socketConfig =
          SocketConfig.custom().setSoKeepAlive(true).setTcpNoDelay(true).build();

      PoolingHttpClientConnectionManager poolingHttpClientConnectionManager =
          new PoolingHttpClientConnectionManager();
      poolingHttpClientConnectionManager.setMaxTotal(300);
      poolingHttpClientConnectionManager.setDefaultMaxPerRoute(200);

      httpClientBuilder =
          HttpClientBuilder.create().setConnectionManager(poolingHttpClientConnectionManager)
              .setDefaultRequestConfig(requestConfig).setDefaultSocketConfig(socketConfig).build();
    } catch (Exception ex) {
      // log error
    }
  }

  @Override
  public List<DataResponse> executeSync(DataRequest key) {
    List<DataResponse> responsList = null;
    Future<List<DataResponse>> responseFuture = null;

    try {
      responseFuture = executeAsync(key);
      responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
    } catch (TimeoutException | ExecutionException | InterruptedException ex) {
      responsList =
          Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT,
              DataStatusEnum.ERROR));
      responseFuture.cancel(true);
      // logging exception here
    }
    return responsList;
  }

  @Override
  public Future<List<DataResponse>> executeAsync(DataRequest key) {
    DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder);
    return this.forkJoinPool.submit(task);
  }
}

下面是我的DataFetcherTask类,它也有一个静态类DataRequestTask,它通过创建 URL 来调用我们的应用服务器:

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {
  private final DataRequest key;
  private final CloseableHttpClient httpClientBuilder;

  public DataFetcherTask(DataRequest key, CloseableHttpClient httpClientBuilder) {
    this.key = key;
    this.httpClientBuilder = httpClientBuilder;
  }

  @Override
  protected List<DataResponse> compute() {
    // Create subtasks for the key and invoke them
    List<DataRequestTask> requestTasks = requestTasks(generateKeys());
    invokeAll(requestTasks);

    // All tasks are finished if invokeAll() returns.
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
    for (DataRequestTask task : requestTasks) {
      try {
        responseList.add(task.get());
      } catch (InterruptedException | ExecutionException e) {
        Thread.currentThread().interrupt();
        return Collections.emptyList();
      }
    }
    return responseList;
  }

  private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
    List<DataRequestTask> tasks = new ArrayList<>(keys.size());
    for (DataRequest key : keys) {
      tasks.add(new DataRequestTask(key));
    }
    return tasks;
  }

  // In this method I am making a HTTP call to another service
  // and then I will make List<DataRequest> accordingly.
  private List<DataRequest> generateKeys() {
    List<DataRequest> keys = new ArrayList<>();
    // use key object which is passed in contructor to make HTTP call to another service
    // and then make List of DataRequest object and return keys.
    return keys;
  }

  /** Inner class for the subtasks. */
  private static class DataRequestTask extends RecursiveTask<DataResponse> {
    private final DataRequest request;

    public DataRequestTask(DataRequest request) {
      this.request = request;
    }

    @Override
    protected DataResponse compute() {
      return performDataRequest(this.request);
    }

    private DataResponse performDataRequest(DataRequest key) {
      MappingHolder mappings = DataMapping.getMappings(key.getType());
      List<String> hostnames = mappings.getAllHostnames(key);

      for (String hostname : hostnames) {
        String url = generateUrl(hostname);
        HttpGet httpGet = new HttpGet(url);
        httpGet.setConfig(generateRequestConfig());
        httpGet.addHeader(key.getHeader());

        try (CloseableHttpResponse response = httpClientBuilder.execute(httpGet)) {
          HttpEntity entity = response.getEntity();
          String responseBody =
              TestUtils.isEmpty(entity) ? null : IOUtils.toString(entity.getContent(),
                  StandardCharsets.UTF_8);

          return new DataResponse(responseBody, DataErrorEnum.OK, DataStatusEnum.OK);
        } catch (IOException ex) {
          // log error
        }
      }
      return new DataResponse(DataErrorEnum.SERVERS_DOWN, DataStatusEnum.ERROR);
    }
  }
}

每个DataRequest对象都有一个DataResponse对象。现在一旦有人通过传递DataRequest对象调用我们的库,我们在内部创建List<DataRequest>对象,然后我们DataRequest并行调用每个对象并返回列表List<DataResponse>中的每个DataResponse对象将对相应DataRequest对象产生响应的位置。

下面是流程:

  • 客户将通过传递对象来调用DataClient类。DataRequest他们可以根据自己的要求调用executeSync()或方法。executeAsync()
  • 现在在DataFetcherTask类(它是子类型RecursiveTask之一ForkJoinTask's)中,给定一个key对象,它是一个单一的DataRequest,我将生成List<DataRequest>然后为DataRequest列表中的每个对象并行调用每个子任务。这些子任务ForkJoinPool与父任务一样执行。
  • 现在在DataRequestTask课堂上,我DataRequest通过创建一个 URL 来执行每个对象并返回它的DataResponse对象。

问题陈述:

由于这个库是在一个非常高吞吐量的环境中调用的,所以它必须非常快。对于同步调用,这里可以在单独的线程中执行吗?在这种情况下,它会产生线程的额外成本和资源以及线程上下文切换的成本,所以我有点困惑。我也在ForkJoinPool这里使用这将节省我使用额外的线程池,但它在这里是正确的选择吗?

有没有更好、更有效的方法来做同样的事情,也可以提高性能?我正在使用 Java 7 并且也可以访问 Guava 库,所以如果它可以简化任何事情,那么我也对此持开放态度。

当它在非常重的负载下运行时,我们似乎看到了一些争用。当在非常重的负载下运行时,这段代码有没有办法进入线程争用?

4

2 回答 2

0

我认为在您的情况下,最好使用异步 http 调用,请参阅链接:HttpAsyncClient。而且您不需要使用线程池。

在 executeAsync 方法中创建空的 CompletableFuture<DataResponse>() 并将其传递给客户端调用,在回调调用中通过调用 complete 来设置 completableFuture 的结果(如果引发异常,则为 completeExceptionally)。ExecuteSync 方法实现看起来不错。

编辑:

对于 java 7,它只需要替换一个 completableFuture 来承诺在 guava 中的实现,比如 ListenableFuture 或类似的东西

于 2017-03-09T06:53:43.977 回答
0

使用的选择ForkJoinPool是正确的,它旨在提高许多小任务的效率:

ForkJoinPool 与其他类型的 ExecutorService 的不同之处主要在于采用了工作窃取:池中的所有线程都尝试查找并执行提交到池和/或由其他活动任务创建的任务(如果不存在,则最终阻塞等待工作) . 当大多数任务产生其他子任务(大多数 ForkJoinTasks 也是如此)时,以及当许多小任务从外部客户端提交到池时,这可以实现高效处理。尤其是在构造函数中将 asyncMode 设置为 true 时,ForkJoinPools 也可能适用于从未加入的事件式任务。

我建议在构造函数中尝试,asyncMode = true因为在您的情况下,任务从未加入:

public class DataClient implements Client {
    private final ForkJoinPool forkJoinPool = new ForkJoinPool(16, ForkJoinPool.ForkJoinWorkerThreadFactory, null, true);
...
}

对于executeSync()您可以使用forkJoinPool.invoke(task),这是在池中执行同步任务以进行资源优化的托管方式:

@Override
public List<DataResponse> executeSync(DataRequest key) {
  DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder);
  return this.forkJoinPool.invoke(task);
}

如果您可以使用 Java 8,那么已经优化了一个公共池:ForkJoinPool.commonPool()

于 2017-03-09T08:12:45.960 回答