如果您在尝试时被要求拆分工作,我会这样做:
public class App {
public static class Statistics {
}
public static class StatisticsCalculator implements Callable<Statistics> {
private final List<String> lines;
public StatisticsCalculator(List<String> lines) {
this.lines = lines;
}
@Override
public Statistics call() throws Exception {
//do stuff with lines
return new Statistics();
}
}
public static void main(String[] args) {
final File file = new File("path/to/my/file");
final List<List<String>> partitionedWork = partitionWork(readLines(file), 10);
final List<Callable<Statistics>> callables = new LinkedList<>();
for (final List<String> work : partitionedWork) {
callables.add(new StatisticsCalculator(work));
}
final ExecutorService executorService = Executors.newFixedThreadPool(Math.min(partitionedWork.size(), 10));
final List<Future<Statistics>> futures;
try {
futures = executorService.invokeAll(callables);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
try {
for (final Future<Statistics> future : futures) {
final Statistics statistics = future.get();
//do whatever to aggregate the individual
}
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
static List<String> readLines(final File file) {
//read lines
return new ArrayList<>();
}
static List<List<String>> partitionWork(final List<String> lines, final int blockSize) {
//divide up the incoming list into a number of chunks
final List<List<String>> partitionedWork = new LinkedList<>();
for (int i = lines.size(); i > 0; i -= blockSize) {
int start = i > blockSize ? i - blockSize : 0;
partitionedWork.add(lines.subList(start, i));
}
return partitionedWork;
}
}
我创建了一个Statistics
对象,它保存了完成工作的结果。
有一个StatisticsCalculator
对象是 a Callable<Statistics>
- 这会进行计算。它被赋予 aList<String>
并处理行并创建Statistics
.
readLines
我留给你实施的方法。
在许多方面最重要的方法是partitionWork
方法,它将List<String>
文件中所有行的传入划分为List<List<String>>
使用blockSize
. 这基本上决定了每个线程应该有多少工作,blockSize
参数的调整非常重要。好像每件作品只有一行,那么开销可能会超过优势,而如果每件作品有一万行,那么你只有一个工作Thread
。
最后,操作的核心是main
方法。这将调用读取然后分区方法。它产生一个ExecutorService
线程数等于工作位数但最多 10 个的线程。您可以通过方法使其等于您拥有的内核数。
然后,该main
方法将List
所有Callable
s 中的一个(每个块一个)提交给executorService
. 该invokeAll
方法阻塞,直到工作完成。
该方法现在遍历每个返回List<Future>
并获取每个生成的Statistics
对象;准备聚合。
之后不要忘记关闭它,executorService
因为它会阻止您的申请表退出。
编辑
OP想要逐行阅读所以这里是一个修改过的main
public static void main(String[] args) throws IOException {
final File file = new File("path/to/my/file");
final ExecutorService executorService = Executors.newFixedThreadPool(10);
final List<Future<Statistics>> futures = new LinkedList<>();
try (final BufferedReader reader = new BufferedReader(new FileReader(file))) {
List<String> tmp = new LinkedList<>();
String line = null;
while ((line = reader.readLine()) != null) {
tmp.add(line);
if (tmp.size() == 100) {
futures.add(executorService.submit(new StatisticsCalculator(tmp)));
tmp = new LinkedList<>();
}
}
if (!tmp.isEmpty()) {
futures.add(executorService.submit(new StatisticsCalculator(tmp)));
}
}
try {
for (final Future<Statistics> future : futures) {
final Statistics statistics = future.get();
//do whatever to aggregate the individual
}
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
这将逐行流式传输文件,并且在给定数量的行之后触发一个新任务以将行处理到执行程序。
clear
完成后,您需要调用List<String>
in ,Callable
因为实例是它们返回Callable
的 s 的引用。Future
如果您在完成后清除List
s ,这应该会大大减少内存占用。
进一步的增强可能是使用这里的建议来阻塞直到有一个备用线程 - 这将保证如果您在 s 完成时清除s ,那么一次内存中的行ExecutorService
数不会超过s 。threads*blocksize
List
Callable