1

我从文件中读取行,当然是在一个线程中。行是按键排序的。

然后我收集具有相同键的行(15-20行),进行解析,大计算等,并将结果对象推送到统计类。

我想并行我的程序以在一个线程中读取,在多个线程中进行解析和计算,并将结果连接到一个线程中以写入 stat 类。

java7框架中是否有针对此问题的现成模式或解决方案?

我通过执行器实现多线程、推送到blockingQueue以及在另一个线程中读取队列,但我认为我的代码很糟糕并且会产生错误

非常感谢

更新:

我无法映射内存中的所有文件 - 它非常大

4

4 回答 4

2

您已经记住了主要的方法类别。CountDownLatch、Thread.join、Executors、Fork/Join。另一种选择是 Akka 框架,它的消息传递开销在 1-2 微秒内测量,并且是开源的。但是,让我分享另一种通常优于上述方法并且更简单的方法,这种方法源于许多公司在 Java 中处理批处理文件加载。

假设您拆分工作的目标是表现,而不是学习。以从开始到结束需要多长时间来衡量的性能。然后,通常很难使其比内存映射文件更快,并且在已固定到单个内核的单个线程中处理。它也提供了更简单的代码。双赢。

这可能与直觉相反,但是处理文件的速度几乎总是受到文件加载效率的限制。不是处理的并行程度。因此,内存映射文件是一个巨大的胜利。一旦内存映射,我们希望算法在执行文件加载时与硬件的争用较低。现代硬件倾向于将 IO 控制器和内存控制器与 CPU 放在同一个插槽上;当与 CPU 本身中的预取器结合使用时,当从单个线程以有序方式处理文件时,会带来非常高的效率。这可能非常极端,以至于并行运行实际上可能要慢得多。将线程固定到内核通常会将内存绑定算法的速度提高 5 倍。这就是内存映射部分如此重要的原因。

如果您还没有,请尝试一下。

于 2013-03-15T13:44:11.087 回答
1

没有事实和数字,很难给你建议。所以让我们从头开始:

  1. 您必须确定瓶颈。您真的需要并行执行计算还是您的工作受 IO 限制?尽可能避免并发,它可能会更快。
  2. 如果计算必须并行完成,您必须决定您的任务必须是细粒度还是粗粒度。您需要测量您的计算和任务才能确定它们的大小。避免创建太多任务
  3. 您应该有一个 IO 线程、几个工人和一个“数据收集器”线程。没有可变数据。
  4. 一定不要因为任务提交而拖慢IO线程。否则,您应该使用更粗粒度的任务或使用更好的任务调度程序(谁说破坏者?)
  5. “数据收集器”线程应该是唯一改变最终状态的线程
  6. 避免不必要的数据复制和对象创建。很多时候,在对大文件进行迭代时,瓶颈是 GC。上周,我用享元模式替换标准 scala 对象实现了 6 倍的加速。您还应该尝试预先分配所有内容并使用大缓冲区(页面大小)。
  7. 避免磁盘寻道。

话虽如此,您应该是正确的轨道之一。您可以从使用适当大小的任务的 Executor 开始。任务写入数据结构,如您的阻塞队列,在工作人员和“数据收集器”线程之间共享。这种线程模型非常简单、高效且不易出错。它通常足够有效。如果您仍然需要更好的性能,那么您必须分析您的应用程序并了解瓶颈。然后你可以决定要走的路:优化你的任务规模,使用像disruptor/Akka这样更快的工具,改进IO,创建更少的对象,调整你的代码,购买更大的机器或更快的磁盘,迁移到Hadoop等。固定每个线程到核心(需要特定于平台的代码)也可以提供显着的提升。

于 2013-03-15T19:26:25.057 回答
0

如果您在尝试时被要求拆分工作,我会这样做:

public class App {

    public static class Statistics {
    }

    public static class StatisticsCalculator implements Callable<Statistics> {

        private final List<String> lines;

        public StatisticsCalculator(List<String> lines) {
            this.lines = lines;
        }

        @Override
        public Statistics call() throws Exception {
            //do stuff with lines
            return new Statistics();
        }
    }

    public static void main(String[] args) {
        final File file = new File("path/to/my/file");
        final List<List<String>> partitionedWork = partitionWork(readLines(file), 10);
        final List<Callable<Statistics>> callables = new LinkedList<>();
        for (final List<String> work : partitionedWork) {
            callables.add(new StatisticsCalculator(work));
        }
        final ExecutorService executorService = Executors.newFixedThreadPool(Math.min(partitionedWork.size(), 10));
        final List<Future<Statistics>> futures;
        try {
            futures = executorService.invokeAll(callables);
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
        try {
            for (final Future<Statistics> future : futures) {
                final Statistics statistics = future.get();
                //do whatever to aggregate the individual
            }
        } catch (InterruptedException | ExecutionException ex) {
            throw new RuntimeException(ex);
        }
        executorService.shutdown();
        try {
            executorService.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    static List<String> readLines(final File file) {
        //read lines
        return new ArrayList<>();
    }

    static List<List<String>> partitionWork(final List<String> lines, final int blockSize) {
        //divide up the incoming list into a number of chunks
        final List<List<String>> partitionedWork = new LinkedList<>();
        for (int i = lines.size(); i > 0; i -= blockSize) {
            int start = i > blockSize ? i - blockSize : 0;
            partitionedWork.add(lines.subList(start, i));
        }
        return partitionedWork;
    }
}

我创建了一个Statistics对象,它保存了完成工作的结果。

有一个StatisticsCalculator对象是 a Callable<Statistics>- 这会进行计算。它被赋予 aList<String>并处理行并创建Statistics.

readLines我留给你实施的方法。

在许多方面最重要的方法是partitionWork方法,它将List<String>文件中所有行的传入划分为List<List<String>>使用blockSize. 这基本上决定了每个线程应该有多少工作,blockSize参数的调整非常重要。好像每件作品只有一行,那么开销可能会超过优势,而如果每件作品有一万行,那么你只有一个工作Thread

最后,操作的核心是main方法。这将调用读取然后分区方法。它产生一个ExecutorService线程数等于工作位数但最多 10 个的线程。您可以通过方法使其等于您拥有的内核数。

然后,该main方法将List所有Callables 中的一个(每个块一个)提交给executorService. 该invokeAll方法阻塞,直到工作完成。

该方法现在遍历每个返回List<Future>并获取每个生成的Statistics对象;准备聚合。

之后不要忘记关闭它,executorService因为它会阻止您的申请表退出。

编辑

OP想要逐行阅读所以这里是一个修改过的main

 public static void main(String[] args) throws IOException {
    final File file = new File("path/to/my/file");
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final List<Future<Statistics>> futures = new LinkedList<>();
    try (final BufferedReader reader = new BufferedReader(new FileReader(file))) {
        List<String> tmp = new LinkedList<>();
        String line = null;
        while ((line = reader.readLine()) != null) {
            tmp.add(line);
            if (tmp.size() == 100) {
                futures.add(executorService.submit(new StatisticsCalculator(tmp)));
                tmp = new LinkedList<>();
            }
        }
        if (!tmp.isEmpty()) {
            futures.add(executorService.submit(new StatisticsCalculator(tmp)));
        }
    }
    try {
        for (final Future<Statistics> future : futures) {
            final Statistics statistics = future.get();
            //do whatever to aggregate the individual
        }
    } catch (InterruptedException | ExecutionException ex) {
        throw new RuntimeException(ex);
    }
    executorService.shutdown();
    try {
        executorService.awaitTermination(1, TimeUnit.DAYS);
    } catch (InterruptedException ex) {
        throw new RuntimeException(ex);
    }
}

这将逐行流式传输文件,并且在给定数量的行之后触发一个新任务以将行处理到执行程序。

clear完成后,您需要调用List<String>in ,Callable因为实例是它们返回Callable的 s 的引用。Future如果您在完成后清除Lists ,这应该会大大减少内存占用。

进一步的增强可能是使用这里的建议来阻塞直到有一个备用线程 - 这将保证如果您在 s 完成时清除s ,那么一次内存中的行ExecutorService数不会超过s 。threads*blocksizeListCallable

于 2013-03-15T12:31:33.017 回答
0

您可以使用 CountDownLatch http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html

同步线程的启动和加入。这比在线程集上循环并在每个线程引用上调用 join() 更好。

于 2013-03-15T12:23:28.483 回答