除非它是 16+ 核或更高的机器,否则通过单个应用程序在单个机器中管理这么多线程绝对不是一个好选择。
考虑诸如您的工作是 I/O 密集型还是 CPU 密集型等因素,并做出适当的选择。在这里和这里阅读
我通常使用
int maxThreadCount = Runtime.getRuntime().availableProcessors();
ExecutorService executor =
new ThreadPoolExecutor(
0, maxThreadCount - 1,
1, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(maxThreadCount * 2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
现在通过添加任务来进行处理,并等待一切完成:
while (moreTaskstoDo) {
Callable c =...
executor.submit(c);
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
现在有了 Java 8+,您可以考虑更有效地执行此操作。
我自己做了一个小的基准测试。以下代码受文章启发,您可以阅读有关Java 8 手册的更多信息
考虑这个求总数的函数。
//approach 1: old school
private static void findingTotalOldSchool() {
long total = 0;
long start = System.nanoTime();
for (long i = 1; i < LIMIT; i++) {
total = total + (i * FACTOR);
}
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
public static Range range(int max) {
return new Range(max);
}
// Approach 2: custom iterator
private static void findingTotalCustomIterator() {
long total = 0;
long start = System.nanoTime();
for (long i : range(LIMIT)) {
total = total + i * FACTOR;
}
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
// Approach 3: using streams
private static void findingTotalStream() {
long start = System.nanoTime();
long total = 0;
total = LongStream.range(1, LIMIT)
.map(t -> t * FACTOR)
.sum();
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
// Approach 4: using parallel streams
private static void findingTotalParallelStream() {
long start = System.nanoTime();
long total = 0;
total = LongStream.range(1, LIMIT)
.parallel()
.map(t -> t * FACTOR)
.sum();
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
// Approach 5: Using Completable Futures alone
private static void findingTotalCFS() {
long start = System.nanoTime();
List<CompletableFuture<Long>> futures =
LongStream.range(1, LIMIT).boxed()
.map(t -> CompletableFuture.supplyAsync(() -> t * FACTOR ))
.collect(Collectors.toList());
//Code here --- could run ahead hence joining on futures
long total = futures.stream().map(CompletableFuture::join).mapToLong(t->t).sum();
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Futures used: "+futures.size());
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
// Approach 6: Using Completable Futures managed by Executor Service
private static void findingTotalCFSE() {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
List<CompletableFuture<Long>> futures =
LongStream.range(1, LIMIT).boxed()
.map(t -> CompletableFuture.supplyAsync(() -> {
return t * FACTOR;
}, executor))
.collect(Collectors.toList());
long total = futures.stream().map(CompletableFuture::join).mapToLong(t->t).sum();
executor.shutdownNow();
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Futures used: "+futures.size());
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
// Approach 7: Using Executor service alone
private static void findingTotalES() {
long start = System.nanoTime();
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
long total = LongStream.
range(1, LIMIT)
.boxed()
.map((i)->executorService.submit(new Operation(i, FACTOR)))
.map((Future<Long> future)-> {
try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}catch (ExecutionException e) {
// Extract the actual exception from its wrapper
Throwable t = e.getCause();
}
return 0;
})
.mapToLong(t->t.longValue())
.sum();
executorService.shutdown();
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Duration: "+duration);
System.out.println("Total: "+total);
}
class Operation implements Callable<Long> {
long i; int j;
Operation(long i, int j) { this.i = i; this.j = j; }
@Override
public Long call() {
return i * j;
}
}
class Range implements Iterable<Integer> {
private int limit;
public Range(int limit) {
this.limit = limit;
}
@Override
public Iterator<Integer> iterator() {
final int max = limit;
return new Iterator<Integer>() {
private int current = 0;
@Override
public boolean hasNext() {
return current < max;
}
@Override
public Integer next() {
if (hasNext()) {
return current++;
} else {
throw new NoSuchElementException("Range reached the end");
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("Can't remove values from a Range");
}
};
}
}
我们使用 2 组数据进行了测试运行。每个测试都应该单独运行,而不是作为单个整体运行的一部分(随着 JVM 的优化,结果可能会有所不同)。
//first run
final static int FACTOR = 1;
final static int LIMIT = 10000;
//second run
final static int FACTOR = 9876;
final static int LIMIT = 1000000;
System.out.println("-----Traditional Loop-----");
findingTotalOldSchool();
// 0 ms
// 4 ms
System.out.println("-----Custom Iterator----");
findingTotalCustomIterator();
// 1 ms
// 15 ms
System.out.println("-----Streams-----");
findingTotalStream();
// 38 ms
// 33 ms
System.out.println("-----Parallel Streams-----");
findingTotalParallelStream();
// 29 ms
// 64 ms
System.out.println("-----Completable Futures with Streams-----");
findingTotalCFS();
// 77 ms
// 635 ms
System.out.println("-----Executor Service with Streams-----");
findingTotalES();
// 323 ms
// 12632 ms
System.out.println("-----Completable Futures with Executor Service with Streams-----");
findingTotalCFSE();
// 77 ms
// 844 ms
观察:
- 在大多数情况下,传统循环速度很快。
- 在涉及性能或 IO 操作时使用并行流。
- 对于简单的迭代(涉及替换或简单的数值计算),请使用传统循环。
- Completable Futures with Executor Service 是灵活的,当您需要对线程数量等进行更多控制时,这是一个选择。如果您的工作很复杂,请选择更高阶的系统来帮助您水平分布它,例如 Akka 或 Vert.x