1

假设我们有几百万行必须解析的长文本。
在我的 i7 2600 CPU 上,每 1000 行解析大约需要 13 毫秒。
因此,解析 1,000,000 行大约需要 13 秒。
为了减少执行时间,我已经使用多个线程进行了管理。
使用阻塞队列,我将 1,000,000 行推送为一组 1000 个块,每个块包含 1000 行,并使用 8 个线程使用这些块。代码很简单,似乎可以正常工作,但是性能并不令人鼓舞,大约需要 11 秒。
这是多线程代码的主要部分:

    for(int i=0;i<threadCount;i++)
    {
        Runnable r=new Runnable() {
            public void run() {
                try{
                    while (true){
                        InputType chunk=inputQ.poll(10, TimeUnit.MILLISECONDS);
                        if(chunk==null){
                            if(inputRemains.get())
                                continue;
                            else
                                return;
                        }
                        processItem(chunk);
                    }
                }catch (Exception e) {
                    e.printStackTrace();  
                }
            }
        };
        Thread t=new Thread(r);
        threadList.add(t);
        for(Thread t: threads)
            t.join();

我也使用过 ExecutorService 但性能更差!
更改块大小也无济于事,性能也没有提高。
这意味着阻塞队列不是瓶颈。
另一方面,当我同时运行 4 个串行程序实例时,所有 4 个实例只需要 15 秒即可完成。这意味着我可以在 15 秒内使用 4 个进程处理 4,000,0000 行,因此,与多线程的 1.2 加速相比,速度提升在 3.4 左右,这是非常有希望的。

我想知道有人对此有任何想法吗?
问题很简单:阻塞队列中的一组行和几个线程从队列中轮询项目并并行处理它们。队列最初已填满,因此线程完全忙碌。
我以前也有类似的经历,但我不明白为什么多处理更好。
我还应该提到我在 Windows 7 上运行测试并使用 1.7 JRE。
欢迎任何想法,并在手前感谢。

4

4 回答 4

1

编辑:

所以我最初认为你的时间安排是围绕你的整个程序。如果您只是在将行读入内存对它们的处理进行计时,那么可能是您的processItem(chunk);方法正在执行自己的 IO,或者正在将信息写入synchronized对象或其他阻止它的共享变量能够同时满载运行。


我想知道有人对此有任何想法吗?

您的问题可能是您受IO 限制不是CPU 板。通过添加更多线程来显着提高速度的唯一方法是,如果您执行的 CPU 处理量比读取(或写入)磁盘的量多。一旦将磁盘子系统的 IO 功能发挥到极致,就无法提高处理速度。正如您所展示的,添加更多线程实际上会减慢 IO 绑定程序。

我会添加一个额外的线程(即 2 个处理线程),看看是否有帮助。如果您得到的只是 2 秒的速度提升,那么您将不得不将文件划分到多个驱动器上,或者如果这是一项重复任务,则必须将其移动到内存驱动器才能更快地读取它。

我也使用过 ExecutorService 但性能更差!

这可能是因为您使用了太多线程,或者每次迭代/块处理的行太少。

另一方面,当我同时运行 4 个串行程序实例时,所有 4 个实例只需要 15 秒即可完成

我怀疑这是因为他们每个人都可以从操作系统中使用彼此的磁盘缓存。当第一个应用程序读取块 #1 时,其他 3 个应用程序不必这样做。尝试复制文件 4 次并尝试在各自的文件上同时运行 4 个串行应用程序。你应该看到区别。

于 2012-10-16T20:59:27.127 回答
0

2600 为 8 个线程使用 HT(超线程).. 解析主要是内存工作,因此从 HT 中获益很少。

于 2012-11-14T07:35:02.380 回答
0

我会责怪你的代码并行化。如果项目可用于处理,则多个线程将竞争相同的资源(队列)。同步锁的争用是一个性能杀手。如果项目的处理速度比它们添加到队列中的速度快,那么被饿死的线程几乎只是繁忙的循环,例如。while (true) {}. 这是因为您的轮询时间很短,当轮询失败时,您只需立即重试。

关于同步的一点说明。首先,JVM 使用繁忙循环来等待资源可用,因为(通常)编写代码以尽快释放同步锁,而替代方案(进行上下文切换)非常昂贵。最终,如果 JVM 发现它大部分时间都在等待同步锁,那么如果它无法获取锁,它将默认切换到不同的线程。

更好的解决方案是让一个线程读取数据并在同时有可用的线程插槽和新线程的数据时分派一个新线程。在这里 Executor 会很有用,因为它可以跟踪哪些线程已经完成,哪些仍然很忙。但是伪代码看起来像:

int charsRead;
char[] buffer = new char[BUF_SIZE];
int startIndex = 0;

while((charsRead = inputStreamReader.read(buffer, startIndex, buffer.length)
                != -1) {
    // find last new line so don't give a thread any partial lines
    int lastNewLine = findFirstNewLineBeforeIndex(buffer, charsRead);

    waitForAvailableThread(); // if not max threads running then should return 
    // immediately
    Thread t = new Thread(createRunnable(buffer, lastNewLine));
    t.start();
    addRunningThread(t);

    // copy any overshoot to the start of a new buffer
    // use a new buffer as the another thread is now reading from the previous 
    // buffer
    char[] newBuffer = new char[BUF_SIZE];
    System.arraycopy(buffer, lastNewLine+1, newBuffer, 0, 
        charsRead-lastNewLine-1);
    buffer = newBuffer;
}
waitForRemainingThreadsToTerminate();
于 2012-10-16T21:58:02.033 回答
0

每 1000 行解析大约需要 13 毫秒。因此,解析 1,000,000 行大约需要 13 秒。

JVM 在完成 10,000 次之后才会预热,之后它可以快 10-100 倍,因此它可能是 13 秒,也可能是 130 毫秒或更短。

使用阻塞队列,我将 1,000,000 行推送为一组 1000 个块,每个块包含 1000 行,并使用 8 个线程使用这些块。代码很简单,似乎可以正常工作,但是性能并不令人鼓舞,大约需要 11 秒。

我建议你重新测试一个线程,你很可能会发现不到 11 秒。

瓶颈是将字符串解析为一行并创建字符串对象所花费的时间,其余的只是开销,没有解决真正的瓶颈。


如果您读取不同的文件,每个 cpu 一个,您可以接近线性加速。阅读行的问题是您必须一个接一个地阅读,而您从并发中获得的好处很少。

于 2012-10-16T22:00:55.413 回答