0

我在八个线程之间拆分计算并将结果写入文件,如下所示:

1a。七个线程中的每一个都处理其输入并将其输出写入自己的ByteArrayOutputStream;当流关闭时,线程offersan<Integer, ByteArrayOutputStream>到 a ConcurrentLinkedQueue,并调用countDown()a CountDownLatch(初始化为 7)。

1b。同时,第八个线程读入将在下一次迭代中处理的所有输入数据。该线程awaitsCountDownLatch完成读取其数据时。

2a. 当CountDownLatch达到 0 时,第 8 个线程唤醒,ConcurrentinkedQueue使用Integerin<Integer, ByteArrayOutputStream>作为排序键对 进行排序,然后遍历队列并将字节数组附加到文件中。(可能有一种更有效的方法来按顺序遍历列表而不对其进行排序,但列表中只有七个元素,因此排序方法的运行时不是问题。)

2b。同时,其他七个线程处理第八个线程为它们准备的输入。

** 此过程循环,直到处理完所有数据(通常为 40-80 次迭代)。

每个线程处理一个大小相等的输入块(可能在最后一次迭代中除外),大小为 8mb;每个ByteArrayOutputStream包含 1-4 mb,并且无法提前知道输出大小。通常,最早完成和最新完成的 CPU 绑定线程的运行时间彼此相差 20% 以内。

我想知道是否有一个 IO 库(或者我错过的 java.io 或 java.nio 中的方法)已经做了这样的事情 - 目前第八个线程(IO 线程)空闲大约 75%时间,但是我想出的任何减轻这种低效率的方法都让我觉得太复杂了(因此在造成死锁或数据竞争方面风险太大);例如,我可以将输入分成 4 mb 块,然后将两个块分配给七个 CPU 绑定线程,一个块分配给 IO 绑定线程,这在理论上可以将 IO 线程的空闲时间减少到 25%(25% on IO,50% 在 4 mb 块上,25% 空闲),但这是一个脆弱的解决方案,可能无法移植到另一个 CPU(这意味着在另一个 CPU 上,IO 绑定线程可能会变成一个瓶颈,例如它的运行时间是CPU 绑定线程的 150%) - 我'

4

2 回答 2

1

低效率在于在线程 8 处理任何一个输出之前等待所有 7 个输出完成。最好运行 7 个队列而不是 1 个,即每个源线程一个,并按必要的顺序读取它们。这样,当第一个队列有任何数据时,它会立即处理,而不必等待其他 6 个;队列 2..6 也是如此。当线程 8 完成最后一个队列时,它可以开始生产,或者实际上它可以这样做,而不是等待任何特定队列开始生产。

于 2013-05-16T03:43:17.513 回答
0

我已将算法修改如下:

  1. 我没有将输入块直接分配给 CPU 线程,而是将这些块放在BlockingQueueCPU 线程poll/take处理它们的工作块的
  2. 输出被发送到ConcurrentSkipListMap<Integer, ByteArrayOutputStream>
  3. CPU 线程简单地循环直到被取消。IO 线程查看ConcurrentSkipListMap(使用firstKey)以查看是否有任何数据要写入(我维护了一个计数器来记录下一个键应该是什么,以确保按顺序写入输出流),然后检查BlockingQueue查看是否需要向其中添加任何数据(如果queue.size() < N然后我向其中添加 N 更多块,其中 N 最初等于 12);如果它执行了一个或两个 IO 任务,则它循环,否则它处理来自BlockingQueue然后循环的块。

除非整个BlockingQueue输入已经由 IO 线程处理,否则不应为空 - 空队列表示queue.size() < N需要提高阈值。因此,CPU 线程的逻辑是

while(!cancel) {
    try {
        Input input = queue.poll();
        if(input == null) {
            log.warn("Empty queue");
            input = queue.take();
        }
        process(input);
    } catch (InterruptedException ex) {
        cancel = true;
    }
}
于 2013-05-16T16:17:18.720 回答