7

由于项目需要,我今天刚刚学习了多线程编程。

我有一个字符串处理任务,可以很好地划分为小的子任务。

while (...){
    ...
    // assign task for handler
    Thread t = new Thread(new PCHandler(counter,pc));
    t.start();
    counter++;
}

问题是我需要大约 500K 线程来完成这项任务。我遇到了一个错误:

引起:java.lang.OutOfMemoryError:无法创建新的本机线程

我在网上搜索,似乎 JVM 只允许我创建最多 32K 线程。有一些说明可以通过修改配置文件来扩展此限制。但我想避免修改用户的计算机。那么您能否给我一个建议,如何在限制范围内明智地管理它们?

4

2 回答 2

24

问题是我需要大约 500K 线程来完成这项任务。我遇到了[内存错误]。

在我看来,您应该使用线程池,这样您就可以提交大量作业,但只能在较少数量的线程中运行它们。

// create a thread pool with 10 threads, this can be optimized to your hardware
ExecutorService threadPool = Executors.newFixedThreadPool(10);
// submit your handlers to the thread-pool
for (PCHandler handler : handlersToDo) {
    threadPool.submit(handler);
}
// once we have submitted all jobs to the thread pool, it should be shutdown
threadPool.shutdown();
...

如果这不起作用,那么我想了解有关实际需要 500k 并发运行线程的系统的更多详细信息。您可以通过一些内存设置调整和增加您的机器上的核心内存来实现这一点,但我怀疑重新构建您的应用程序是为了。

正如@Peter 在评论中提到的那样,要优化池中的线程数,您可以获得可用处理器的数量和其他系统规范来解决这个问题。但这在很大程度上取决于您的PCHandler课程的 CPU 密集程度。它做的 IO 越多,可以利用的并发性就越多。可能使用传递给该newFixedThreadPool(...)方法的不同值进行一些测试运行是为了确定那里的最佳设置。

此外,根据 500k 作业对象的大小,您可能希望限制它们的创建。为此,您可以创建一个带有有队列的线程池,这将限制在任何一个时间点可能未完成的作业数量。

于 2013-10-02T13:08:29.823 回答
1

除非它是 16+ 核或更高的机器,否则通过单个应用程序在单个机器中管理这么多线程绝对不是一个好选择。

考虑诸如您的工作是 I/O 密集型还是 CPU 密集型等因素,并做出适当的选择。在这里这里阅读

我通常使用

int maxThreadCount = Runtime.getRuntime().availableProcessors();
  ExecutorService executor = 
    new ThreadPoolExecutor(
      0, maxThreadCount - 1,
      1, TimeUnit.SECONDS,
      new LinkedBlockingDeque<>(maxThreadCount * 2),
      Executors.defaultThreadFactory(),
      new ThreadPoolExecutor.CallerRunsPolicy());

现在通过添加任务来进行处理,并等待一切完成:

while (moreTaskstoDo) {
Callable c =...
    executor.submit(c);
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

现在有了 Java 8+,您可以考虑更有效地执行此操作。

我自己做了一个小的基准测试。以下代码受文章启发,您可以阅读有关Java 8 手册的更多信息

考虑这个求总数的函数。

//approach 1: old school
private static void findingTotalOldSchool()  {
    long total = 0;
    long start = System.nanoTime();

    for (long i = 1; i < LIMIT; i++) {
        total = total + (i * FACTOR);
    }

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

public static Range range(int max)  {
    return new Range(max);
}

// Approach 2: custom iterator
private static void findingTotalCustomIterator() {
    long total = 0;
    long start = System.nanoTime();

    for (long i : range(LIMIT)) {
        total = total + i * FACTOR;
    }

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

// Approach 3: using streams
private static void findingTotalStream() {
    long start = System.nanoTime(); 
    long total = 0;

    total = LongStream.range(1, LIMIT)
            .map(t -> t * FACTOR)
            .sum();

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

// Approach 4: using parallel streams
private static void findingTotalParallelStream() {
    long start = System.nanoTime(); 
    long total = 0;

    total = LongStream.range(1, LIMIT)
            .parallel()
            .map(t -> t * FACTOR)
            .sum();

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

// Approach 5: Using Completable Futures alone
private static void findingTotalCFS() {
     long start = System.nanoTime();

     List<CompletableFuture<Long>> futures = 
             LongStream.range(1, LIMIT).boxed()
             .map(t -> CompletableFuture.supplyAsync(() -> t * FACTOR ))
             .collect(Collectors.toList());
     //Code here --- could run ahead hence joining on futures
     long total = futures.stream().map(CompletableFuture::join).mapToLong(t->t).sum();

     long duration = (System.nanoTime() - start) / 1_000_000;
     System.out.println("Futures used: "+futures.size());
     System.out.println("Duration: "+duration);
     System.out.println("Total: "+total);
}

// Approach 6: Using Completable Futures managed by Executor Service
private static void findingTotalCFSE() {
    long start = System.nanoTime();

    ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    List<CompletableFuture<Long>> futures =
             LongStream.range(1, LIMIT).boxed()
             .map(t -> CompletableFuture.supplyAsync(() -> {
                    return t * FACTOR;
            }, executor))
             .collect(Collectors.toList());

     long total = futures.stream().map(CompletableFuture::join).mapToLong(t->t).sum();
     executor.shutdownNow();

     long duration = (System.nanoTime() - start) / 1_000_000;
     System.out.println("Futures used: "+futures.size());
     System.out.println("Duration: "+duration);
     System.out.println("Total: "+total);
}

// Approach 7: Using Executor service alone
private static void findingTotalES() {
    long start = System.nanoTime();

    ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    long total  = LongStream.
        range(1, LIMIT)
        .boxed()
        .map((i)->executorService.submit(new Operation(i, FACTOR)))
        .map((Future<Long> future)-> {
            try {
                return future.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }catch (ExecutionException e) {
                // Extract the actual exception from its wrapper
                Throwable t = e.getCause();
            } 
            return 0;
        })
        .mapToLong(t->t.longValue())
        .sum();

    executorService.shutdown();

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

class Operation implements Callable<Long> {

    long i; int j;
    Operation(long i, int j) { this.i = i; this.j = j; }

    @Override
    public Long call() {
        return i * j;
    }
}


class Range implements Iterable<Integer> {

    private int limit;

    public Range(int limit) {
        this.limit = limit;
    }

    @Override
    public Iterator<Integer> iterator() {
        final int max = limit;
        return new Iterator<Integer>() {

            private int current = 0;

            @Override
            public boolean hasNext() {
                return current < max;
            }

            @Override
            public Integer next() {
                if (hasNext()) {
                    return current++;   
                } else {
                    throw new NoSuchElementException("Range reached the end");
                }
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException("Can't remove values from a Range");
            }
        };
    }
}

我们使用 2 组数据进行了测试运行。每个测试都应该单独运行,而不是作为单个整体运行的一部分(随着 JVM 的优化,结果可能会有所不同)。

//first run
final static int FACTOR = 1;
final static int LIMIT = 10000;

//second run
final static int FACTOR = 9876;
final static int LIMIT = 1000000;


System.out.println("-----Traditional Loop-----");
findingTotalOldSchool();
// 0 ms
// 4 ms     

System.out.println("-----Custom Iterator----");
findingTotalCustomIterator();
// 1 ms
// 15 ms


System.out.println("-----Streams-----");
findingTotalStream();
// 38 ms
// 33 ms        


System.out.println("-----Parallel Streams-----");
findingTotalParallelStream();
// 29 ms
// 64 ms


System.out.println("-----Completable Futures with Streams-----");
findingTotalCFS();
// 77 ms
// 635 ms       


System.out.println("-----Executor Service with Streams-----");
findingTotalES();
// 323 ms
// 12632 ms

System.out.println("-----Completable Futures with Executor Service with Streams-----");
findingTotalCFSE();
// 77 ms
// 844 ms   

观察:

  • 在大多数情况下,传统循环速度很快。
  • 在涉及性能或 IO 操作时使用并行流。
  • 对于简单的迭代(涉及替换或简单的数值计算),请使用传统循环。
  • Completable Futures with Executor Service 是灵活的,当您需要对线程数量等进行更多控制时,这是一个选择。如果您的工作很复杂,请选择更高阶的系统来帮助您水平分布它,例如 Akka 或 Vert.x
于 2019-06-10T18:42:10.343 回答