9

我必须使用不同参数多次使用 RestTemplate 进行 Rest API 调用。API 是相同的,但它是正在更改的参数。次数也是可变的。我想使用 AsyncRestTemplate 但我的主线程应该等到所有 API 调用都成功完成。我还想处理每个 API 调用返回的响应。目前我正在使用 RestTemplate。基本形式如下。

List<String> listOfResponses = new ArrayList<String>();
for (Integer studentId : studentIdsList) {
    String respBody;
    try {
        ResponseEntity<String> responseEntity = restTemplate.exchange(url, method, requestEntity, String.class);
    } catch (Exception ex) {
        throw new ApplicationException("Exception while making Rest call.", ex);
    }
    respBody = requestEntity.getBody();
    listOfResponses.add(respBody);          
}

在这种情况下如何实现 AsyncRestTemplate?

4

3 回答 3

12

使用时的主要思想AsyncRestTemplate(或任何异步 API,实际上)是在第一次发送所有请求,保留相应的期货,然后在第二次处理所有响应。您可以简单地使用 2 个循环来执行此操作:

List<ListenableFuture<ResponseEntity<String>>> responseFutures = new ArrayList<>();
for (Integer studentId : studentIdsList) {
    // FIXME studentId is not used
    ListenableFuture<ResponseEntity<String>> responseEntityFuture = restTemplate.exchange(url, method, requestEntity, String.class);
    responseFutures.add(responseEntityFuture);
}
// now all requests were send, so we can process the responses
List<String> listOfResponses = new ArrayList<>();
for (ListenableFuture<ResponseEntity<String>> future: responseFutures) {
    try {
        String respBody = future.get().getBody();
        listOfResponses.add(respBody);
    } catch (Exception ex) {
        throw new ApplicationException("Exception while making Rest call.", ex);
    }
}

注意:如果您需要将响应与原始请求配对,您可以将 future 列表替换为 map 或 request+response 对象列表。

我还注意到studentId您的问题中没有使用它。

于 2017-06-12T08:55:16.900 回答
5

如果这对您可行,您可以使用 Java 8 Stream API:

List<String> listOfResponses = studentIdsList.stream()
    .parrallel()
    .map({studentId ->
        ResponseEntity<String> responseEntity = restTemplate.exchange(url, method, studentId, String.class);
        return responseEntity.getBody();
    })
    .collect(Collectors.toList());

这段代码基本上会执行两件事:

  1. 并行执行请求;
  2. 将请求的结果收集到一个列表中。

更新:同意@Didier L - 当您需要执行大量请求时,此解决方案可能无法正常工作。这是一个更新的版本:

List<String> listOfResponses  = studentIdsList.stream()
                .map(studentId -> asyncRestTemplate.exchange(url, method, studentId, String.class)
                .collect(Collectors.toList()).stream()
                .map(this::retrieveResult)
                .collect(Collectors.toList());

    /**
     * Retrieves results of each request by blocking the main thread. Note that the actual request was performed on the previous step when
     * calling asyncRestTemplate.exchange(url, method, studentId, String.class)
     */
    private String retrieveResult(ListenableFuture<ResponseEntity<String>> listenableFuture) {
        try {
            return listenableFuture.get().getBody();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
于 2017-06-12T10:58:05.253 回答
2

这是我想建议的另一个解决方案,它使用 Spring 的 RestTemplate 而不是 AsyncRestTemplate。它还使用 Java 8 CompletableFuture。

public void sendRequestsAsync(List<Integer> studentList) {
    List<CompletableFuture<Void>> completableFutures = new ArrayList<>(studentList.size()); //List to hold all the completable futures
    List<String> responses = new ArrayList<>(); //List for responses
    ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    for (Integer studentId : studentList) { //Iterate student list
        CompletableFuture<Void> requestCompletableFuture = CompletableFuture
                .supplyAsync(
                        () -> restTemplate.exchange("URL/" + studentId, HttpMethod.GET, null, String.class),
                        yourOwnExecutor
                )//Supply the task you wanna run, in your case http request
                .thenApply((responseEntity) -> {
                    responses.add(responseEntity.getBody());
                    return responseEntity;
                })//now you can add response body to responses
                .thenAccept((responseEntity) -> {
                    doSomeFinalStuffWithResponse(responseEntity);
                })//here you can do more stuff with responseEntity (if you need to)
                .exceptionally(ex -> {
                    System.out.println(ex);
                    return null;
                });//do something here if an exception occurs in the execution;

        completableFutures.add(requestCompletableFuture);
    }

    try {
        CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).get(); //Now block till all of them are executed by building another completablefuture with others.
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

我更喜欢这个解决方案,因为我可以链接尽可能多的业务逻辑,并且不必依赖 Spring 的内部来进行异步发送。显然你可以更多地清理代码,我现在还没有太在意。

于 2017-06-12T22:23:05.883 回答