23

我想知道我可以运行的最佳线程数。通常,这等于Runtime.getRuntime().availableProcessors()

但是,在支持超线程的 CPU 上,返回的数字是其两倍。现在,对于某些任务,超线程是好的,但对于其他任务,它什么也没做。就我而言,我怀疑它什么也没做,所以我想知道我是否必须将返回的数字一分为Runtime.getRuntime().availableProcessors()二。

为此,我必须推断 CPU 是否是超线程。因此我的问题是——我怎样才能在 Java 中做到这一点?

谢谢。

编辑

好的,我已经对我的代码进行了基准测试。这是我的环境:

  • Lenovo ThinkPad W510(即具有 4 核和超线程的 i7 CPU),16G RAM
  • Windows 7的
  • 84 个压缩的 CSV 文件,压缩大小从 105M 到 16M
  • 所有文件都在主线程中一一读取 - 没有对 HD 的多线程访问。
  • 每个 CSV 文件行都包含一些数据,这些数据会被解析并通过快速的无上下文测试确定该行是否相关。
  • 每个相关行都包含两个双精度(表示经度和纬度,出于好奇),它们被强制转换为一个Long,然后存储在一个共享的哈希集中。

因此,工作线程不会从 HD 中读取任何内容,但它们会忙于解压缩和解析内容(使用opencsv库)。

下面是代码,没有无聊的细节:

public void work(File dir) throws IOException, InterruptedException {
  Set<Long> allCoordinates = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
  int n = 6;
  // NO WAITING QUEUE !
  ThreadPoolExecutor exec = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());
  StopWatch sw1 = new StopWatch();
  StopWatch sw2 = new StopWatch();
  sw1.start();
  sw2.start();
  sw2.suspend();
  for (WorkItem wi : m_workItems) {
    for (File file : dir.listFiles(wi.fileNameFilter)) {
      MyTask task;
      try {
        sw2.resume();
        // The only reading from the HD occurs here:
        task = new MyTask(file, m_coordinateCollector, allCoordinates, wi.headerClass, wi.rowClass);
        sw2.suspend();
      } catch (IOException exc) {
        System.err.println(String.format("Failed to read %s - %s", file.getName(), exc.getMessage()));
        continue;
      }
      boolean retry = true;
      while (retry) {
        int count = exec.getActiveCount();
        try {
          // Fails if the maximum of the worker threads was created and all are busy.
          // This prevents us from loading all the files in memory and getting the OOM exception.
          exec.submit(task);
          retry = false;
        } catch (RejectedExecutionException exc) {
          // Wait for any worker thread to finish
          while (exec.getActiveCount() == count) {
            Thread.sleep(100);
          }
        }
      }
    }
  }
  exec.shutdown();
  exec.awaitTermination(1, TimeUnit.HOURS);
  sw1.stop();
  sw2.stop();
  System.out.println(String.format("Max concurrent threads = %d", n));
  System.out.println(String.format("Total file count = %d", m_stats.getFileCount()));
  System.out.println(String.format("Total lines = %d", m_stats.getTotalLineCount()));
  System.out.println(String.format("Total good lines = %d", m_stats.getGoodLineCount()));
  System.out.println(String.format("Total coordinates = %d", allCoordinates.size()));
  System.out.println(String.format("Overall elapsed time = %d sec, excluding I/O = %d sec", sw1.getTime() / 1000, (sw1.getTime() - sw2.getTime()) / 1000));
}

public class MyTask<H extends CsvFileHeader, R extends CsvFileRow<H>> implements Runnable {
  private final byte[] m_buffer;
  private final String m_name;
  private final CoordinateCollector m_coordinateCollector;
  private final Set<Long> m_allCoordinates;
  private final Class<H> m_headerClass;
  private final Class<R> m_rowClass;

  public MyTask(File file, CoordinateCollector coordinateCollector, Set<Long> allCoordinates,
                Class<H> headerClass, Class<R> rowClass) throws IOException {
    m_coordinateCollector = coordinateCollector;
    m_allCoordinates = allCoordinates;
    m_headerClass = headerClass;
    m_rowClass = rowClass;
    m_name = file.getName();
    m_buffer = Files.toByteArray(file);
  }

  @Override
  public void run() {
    try {
      m_coordinateCollector.collect(m_name, m_buffer, m_allCoordinates, m_headerClass, m_rowClass);
    } catch (IOException e) {
      e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
    }
  }
}

请在结果下方找到(我稍微更改了输出以省略重复部分):

Max concurrent threads = 4
Total file count = 84
Total lines = 56395333
Total good lines = 35119231
Total coordinates = 987045
Overall elapsed time = 274 sec, excluding I/O = 266 sec

Max concurrent threads = 6
Overall elapsed time = 218 sec, excluding I/O = 209 sec

Max concurrent threads = 7
Overall elapsed time = 209 sec, excluding I/O = 199 sec

Max concurrent threads = 8
Overall elapsed time = 201 sec, excluding I/O = 192 sec

Max concurrent threads = 9
Overall elapsed time = 198 sec, excluding I/O = 186 sec

您可以自由得出自己的结论,但我的观点是,在我的具体案例中,超线程确实提高了性能。此外,拥有 6 个工作线程似乎是这项任务和我的机器的正确选择。

4

7 回答 7

5

不幸的是,这在 java 中是不可能的。如果您知道该应用程序将在现代 linux 变体上运行,您可以读取文件 /proc/cpuinfo 并推断是否启用了 HT。

读取此命令的输出可以解决问题:

grep -i "physical id" /proc/cpuinfo | sort -u | wc -l
于 2012-07-31T10:38:37.583 回答
4

没有可靠的方法来确定您是否启用了超线程、关闭了超线程或没有超线程。

相反,更好的方法是在您第一次运行(或每次)时进行第一次校准,运行第一次测试以确定使用哪种方法。

另一种方法是使用所有处理器,即使超线程没有帮助(前提是它不会使代码显着变慢)

于 2012-07-31T11:15:33.130 回答
3

还有一些沉思:

  • 超线程每个代码可能有超过 2 个线程(Sparc 可以有 8 个)
  • 垃圾收集器也需要 CPU 时间才能工作。
  • 超线程可能有助于并发 GC - 也可能不会;或者 JVM 可能会请求成为内核的独占(非超线程)所有者。因此,从长远来看,阻碍 GC 在测试期间获得更好的结果可能会造成伤害。
  • 如果存在缓存未命中,超线程通常很有用,因此 CPU 不会停止而是切换到另一个任务。因此,“是否超线程”将取决于工作负载和 CPU L1/L2 缓存大小/内存速度等。
  • 操作系统可能对/反对某些线程有偏见,并且 Thread.setPriority 可能不被尊重(在 Linux 上通常不被尊重)。
  • 可以设置进程的亲和性,禁止某些内核。因此,在这种情况下,知道有超线程不会有任何显着的优点。

话虽这么说:您应该设置工作线程的大小,并根据架构的具体情况建议如何设置。

于 2012-08-01T10:05:24.270 回答
1

无法从纯 Java 中确定这一点(毕竟逻辑核心是核心,无论是否使用 HT 实现)。请注意,到目前为止提出的解决方案可以解决您的要求(如您所问),但不仅英特尔 CPU 提供了一种超线程形式(Sparc 浮现在脑海中,我相信还有其他的)。

您也没有考虑到,即使确定系统使用 HT,您也无法控制与 Java 内核的线程亲和性。所以你仍然受制于操作系统的线程调度程序。虽然存在一些看似合理的场景,即更少的线程可以更好地执行(因为减少了缓存垃圾),但无法静态确定应该使用多少线程(毕竟所有 CPU 的缓存大小都非常不同(低端的范围从 256KB现在可以合理地预期服务器中> 16MB。而且随着每一代的更新,这必然会改变)。

只需将其设置为可配置设置,任何在不完全了解目标系统的情况下确定这一点的尝试都是徒劳的。

于 2012-07-31T11:48:41.917 回答
1

对于Windows,如果逻辑核心数高于核心数,则您已hyper-threading启用。在此处阅读更多相关信息。

您可以使用wmic以下信息查找此信息:

C:\WINDOWS\system32>wmic CPU Get NumberOfCores,NumberOfLogicalProcessors /Format:List


NumberOfCores=4
NumberOfLogicalProcessors=8

因此,我的系统具有hyper-threading. 逻辑处理器的数量是内核的两倍。

但你可能甚至不需要知道。Runtime.getRuntime().availableProcessors()已经返回逻辑处理器的数量。

获取物理核心数的完整示例(Windows仅):

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class PhysicalCores
{
    public static void main(String[] arguments) throws IOException, InterruptedException
    {
        int physicalNumberOfCores = getPhysicalNumberOfCores();
        System.out.println(physicalNumberOfCores);
    }

    private static int getPhysicalNumberOfCores() throws IOException, InterruptedException
    {
        ProcessBuilder processBuilder = new ProcessBuilder("wmic", "CPU", "Get", "NumberOfCores");
        processBuilder.redirectErrorStream(true);
        Process process = processBuilder.start();
        String processOutput = getProcessOutput(process);
        String[] lines = processOutput.split(System.lineSeparator());
        return Integer.parseInt(lines[2]);
    }

    private static String getProcessOutput(Process process) throws IOException, InterruptedException
    {
        StringBuilder processOutput = new StringBuilder();

        try (BufferedReader processOutputReader = new BufferedReader(
                new InputStreamReader(process.getInputStream())))
        {
            String readLine;

            while ((readLine = processOutputReader.readLine()) != null)
            {
                processOutput.append(readLine);
                processOutput.append(System.lineSeparator());
            }

            process.waitFor();
        }

        return processOutput.toString().trim();
    }
}
于 2018-07-24T13:52:59.013 回答
0

没有办法做到这一点,你可以做的一件事是Runtime.getRuntime().availableProcessors()在你的应用程序中创建一个线程池,并在请求进来时使用。

这样你就可以拥有 0 -Runtime.getRuntime().availableProcessors()线程数。

于 2012-07-31T11:03:21.947 回答
0

您可能无法可靠地查询操作系统或运行时,但您可以运行快速基准测试。

逐步增加自旋锁线程,测试每个新线程是否像以前一样迭代。一旦其中一个线程的性能低于之前每个测试的一半左右(至少对于英特尔,我不了解 SPARC),您就知道您已经开始与超线程共享一个内核。

于 2018-05-02T05:08:22.997 回答