1

我需要处理一个大文件(具有列和相同格式的行)。由于我需要考虑程序在处理过程中崩溃的情况,我需要这个处理程序是可重试的,这意味着它崩溃后我再次启动程序,它可以继续处理从它失败的行开始的文件。

有什么我可以遵循的模式或我可以使用的库吗?谢谢!


更新:

关于崩溃案例,不仅仅是OOM或一些内部问题。这也可能是由于其他部件超时或机器崩溃造成的。所以 try/catch 无法处理这个问题。


另一个更新:

关于分块文件,在我的情况下是可行的,但并不像听起来那么简单。正如我所说,该文件被格式化为几列,我可以根据其中一列将其拆分为数百个文件,然后一一处理这些文件。但不是这样做,我想了解更多关于处理支持重试的大文件/数据的常见解决方案。

4

3 回答 3

1

我会怎么做(虽然不是专业人士)

  1. 创建一个LineProcessor在文件的每一行上调用

    类处理器实现 LineProcessor> {

            private List<String> lines = Lists.newLinkedList();
            private int startFrom = 0;
            private int lineNumber = 0;
    
            public Processor(int startFrom) {
                this.startFrom = startFrom;
            }
    
            @Override
            public List<String> getResult() {
                return lines;
            }
    
            @Override
            public boolean processLine(String arg0) throws IOException {
                lineNumber++;
                if (lineNumber < startFrom) {
                    // do nothing
                } else {
                    if (new Random().nextInt() % 50000 == 0) {
                        throw new IOException("Randomly thrown Exception " + lineNumber);
                    }
                     //Do the hardwork here
                    lines.add(arg0);
                    startFrom++;
                }
                return true;
            }
        }
    
  2. 创建一个使用我的LineProcessor来读取文件的Callable

    class Reader implements Callable<List<String>> {
    
        private int startFrom;
    
        public Reader(int startFrom) {
            this.startFrom = startFrom;
        }
    
        @Override
        public List<String> call() throws Exception {
            return Files.readLines(new File("/etc/dictionaries-common/words"),
                Charsets.UTF_8, new Processor(startFrom));
        }
    }
    
  3. Callable包装在 Retryer 中并使用Executor调用它

    public static void main(String[] args) throws InterruptedException, ExecutionException {
    BasicConfigurator.configure();
    
    ExecutorService executor = Executors.newSingleThreadExecutor();
    
    Future<List<String>> lines = executor.submit(RetryerBuilder
            .<List<String>> newBuilder()
            .retryIfExceptionOfType(IOException.class)
            .withStopStrategy(StopStrategies.stopAfterAttempt(100)).build()
            .wrap(new Reader(100)));
    
    logger.debug(lines.get().size());
    executor.shutdown();
    logger.debug("Happily Ever After");
    

    }

于 2013-03-21T10:56:24.990 回答
0

您可以在代码中维护检查点/提交样式逻辑。因此,当程序再次运行时,它会从同一个检查点开始。

您可以使用 RandomAccessFile 读取文件并使用 getFilePointer() 作为您保存的检查点。当您再次执行该程序时,您会通过调用 seek(offset) 从此检查点开始。

于 2013-03-21T07:52:11.673 回答
0

尝试从 OOM 错误中捕获将不会保存。您应该分块处理文件并在每次成功分块后将位置存储到文件系统/数据库/即使您的程序崩溃也保持持久性的任何地方。然后,您可以在重新启动软件时从存储位置读取上一点。您还必须在处理整个文件时清除此信息。

于 2013-03-21T07:53:41.510 回答