我目前正在学习使用 java.util.concurrent 包提供的 Java 并发特性。作为一个练习,我尝试编写一个小程序来测试 HTTP API 的性能。但不知何故,我的程序并没有经常正确终止。它甚至使我的操作系统崩溃。
以下是我的程序的伪代码:
- 实例化查询 HTTP API 的请求对象(在示例中我只查询一个随机站点)。
- 实例化多个Callables,其中每个代表一个代表一个Http Call。
- 遍历 Callables 并通过 ScheduledExecutorService 安排它们(可以在代码开头配置每秒应执行的请求数)。
- 在安排好所有 Callables 之后,我开始迭代 Futures。如果期货完成,则检索响应。每秒都这样做。如果没有新的 Future 完成,退出循环。
我详细遇到了什么问题?
- 很多时候,程序没有正确完成。我在控制台中看到了所有的日志打印,就好像程序正在正确完成一样。但实际上我看到 eclipse 中的停止按钮仍然处于活动状态。如果我单击它,它表示程序无法正确终止。无论我等待多长时间,它都不会完成(注意:我正在 eclipse 中启动程序)。
- 如果我增加请求的数量,我可以很容易地引发错误。如果我达到 2000 年,这肯定会发生。如果它发生我的操作系统甚至崩溃,我仍然可以使用 eclipse,但其他应用程序不再工作了。
- 我的环境是 Mac OS X 10.7 上的 Eclipse 3.7,带有 Java 1.6 和 Apache httpclient 4.2.2
您在我的代码中发现任何重大错误吗?在我的操作系统崩溃并且根本看不到任何异常的情况下,我从未在 java 程序中遇到过此类问题。
编码:
public class ConcurrentHttpRequestsTest {
/**
* @param args
*/
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(25);
Integer standardTimeout = 5000;
Float numberOfRequestsPerSecond = 50.0f;
Integer numberOfRequests = 500;
Integer durationBetweenRequests = Math.round(1000 / numberOfRequestsPerSecond);
// build Http Request
HttpGet request = null;
request = new HttpGet("http://www.spiegel.de");
// request.addHeader("Accept", "application/json");
HttpParams params = new BasicHttpParams();
HttpConnectionParams.setConnectionTimeout(params, standardTimeout);
HttpConnectionParams.setSoTimeout(params, standardTimeout);
request.setParams(params);
// setup concurrency logic
Collection<Callable<Long>> callables = new LinkedList<Callable<Long>>();
for (int i = 1; i <= numberOfRequests; i++) {
HttpClient client = new DefaultHttpClient();
callables.add(new UriCallable(request, client));
}
// start performing requests
int i = 1;
Collection<Future<Long>> futures = new LinkedList<Future<Long>>();
for (Callable<Long> callable : callables) {
ScheduledFuture<Long> future = scheduledExecutorService.schedule(callable, i * durationBetweenRequests, TimeUnit.MILLISECONDS);
futures.add(future);
i++;
}
// process futures (check wether they are ready yet)
Integer maximumNoChangeCount = 5;
boolean futuresAreReady = false;
int noChangeCount = 0;
int errorCount = 0;
List<Long> responses = new LinkedList<Long>();
while (!futuresAreReady) {
boolean allFuturesAreDone = true;
boolean atLeast1FutureIsDone = false;
Iterator<Future<Long>> iterator = futures.iterator();
while (iterator.hasNext()) {
Future<Long> future = iterator.next();
allFuturesAreDone = allFuturesAreDone && (future.isDone());
if (future.isDone()) {
try {
atLeast1FutureIsDone = true;
responses.add(future.get());
iterator.remove();
} catch (Exception e) {
// remove failed futures (e.g. timeout)
// System.out.println("Reached catch of future.get()" +
// e.getClass() + " " + e.getCause().getClass() + " " +
// e.getMessage());
iterator.remove();
errorCount++;
}
}
if (future.isCancelled()) {
// this code is never reached. Just here to make sure that
// this is not the cause of problems.
System.out.println("Found a cancelled future. Will remove it.");
iterator.remove();
}
}
if (!atLeast1FutureIsDone) {
System.out.println("At least 1 future was not done. Current noChangeCount:" + noChangeCount);
noChangeCount++;
} else {
// reset noChangeCount
noChangeCount = 0;
}
futuresAreReady = allFuturesAreDone;
// log the current state of responses, errors and remaining futures
System.out.println("Size of responses :" + responses.size() + "; Size of futures:" + futures.size() + " Errors:" + errorCount);
if (noChangeCount >= maximumNoChangeCount) {
System.out.println("Breaking while loop becauce no new future finished in the last " + maximumNoChangeCount + " iterations");
break;
}
// check every second
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (Long responsetime : responses) {
// analyze responsetimes or whatever
}
// clean up
// .shutdown() made even more problems than shutdownNow()
scheduledExecutorService.shutdownNow();
System.out.println("Executors have been shutdown - Main Method finished. Will exit System.");
System.out.flush();
System.exit(0);
}
private static class UriCallable implements Callable<Long> {
private HttpUriRequest request;
private HttpClient client;
public UriCallable(HttpUriRequest request, HttpClient client) {
super();
this.request = request;
this.client = client;
}
public Long call() throws Exception {
Long start = System.currentTimeMillis();
HttpResponse httpResponse = client.execute(request);
Long end = System.currentTimeMillis();
return end - start;
}
}
}