13

我想使用 aStream来并行处理一组异构的远程存储的未知数量的 JSON 文件(文件的数量是未知的)。这些文件的大小差异很大,从每个文件 1 条 JSON 记录到其他一些文件中的 100,000 条记录。在这种情况下, JSON 记录意味着一个自包含的 JSON 对象,表示为文件中的一行。

我真的很想为此使用 Streams,所以我实现了这个Spliterator

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

    abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

    abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

    private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
    private static final int MAX_BUFFER = 100;
    private final Iterator<String> paths;
    private JsonStreamSupport<METADATA> reader = null;

    public JsonStreamSpliterator(Iterator<String> paths) {
        this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
        super(est, additionalCharacteristics);
        this.paths = paths;
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
        this(est, additionalCharacteristics, paths);
        open(nextPath);
    }

    @Override
    public boolean tryAdvance(Consumer<? super RECORD> action) {
        if(reader == null) {
            String path = takeNextPath();
            if(path != null) {
                open(path);
            }
            else {
                return false;
            }
        }
        Map<String, Object> json = reader.readJsonLine();
        if(json != null) {
            RECORD item = parse(reader.getMetadata(), json);
            action.accept(item);
            return true;
        }
        else {
            reader.close();
            reader = null;
            return tryAdvance(action);
        }
    }

    private void open(String path) {
        reader = openInputStream(path);
    }

    private String takeNextPath() {
        synchronized(paths) {
            if(paths.hasNext()) {
                return paths.next();
            }
        }
        return null;
    }

    @Override
    public Spliterator<RECORD> trySplit() {
        String nextPath = takeNextPath();
        if(nextPath != null) {
            return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
                @Override
                protected JsonStreamSupport<METADATA> openInputStream(String path) {
                    return JsonStreamSpliterator.this.openInputStream(path);
                }
                @Override
                protected RECORD parse(METADATA metaData, Map<String,Object> json) {
                    return JsonStreamSpliterator.this.parse(metaData, json);
                }
            };              
        }
        else {
            List<RECORD> records = new ArrayList<RECORD>();
            while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
                // loop
            }
            if(records.size() != 0) {
                return records.spliterator();
            }
            else {
                return null;
            }
        }
    }
}

我遇到的问题是,虽然 Stream 一开始可以很好地并行化,但最终最大的文件会在单个线程中处理。我相信近端原因有据可查:分离器是“不平衡的”。

更具体地说,似乎trySplit在 的生命周期中的某个时间点之后不会调用该方法Stream.forEach,因此在末尾分发小批量的额外逻辑trySplit很少执行。

请注意从 trySplit 返回的所有拆分器如何共享相同的paths迭代器。我认为这是一种非常聪明的方法来平衡所有拆分器的工作,但它还不足以实现完全并行。

我希望首先跨文件进行并行处理,然后当仍然有少数大文件处于拆分状态时,我想跨剩余文件的块进行并行处理。那是else块末尾的意图trySplit

是否有解决此问题的简单/简单/规范的方法?

4

3 回答 3

1

trySplit无论底层文件的大小如何,您都应该输出相同大小的拆分。您应该将所有文件视为一个单元,并ArrayList每次使用相同数量的 JSON 对象填充 -backed 拆分器。对象的数量应该使得处理一个拆分需要 1 到 10 毫秒:低于 1 毫秒,您开始接近将批处理移交给工作线程的成本,高于此值,您开始冒着 CPU 负载不均匀的风险,因为过于粗粒度的任务。

拆分器没有义务报告大小估计,并且您已经正确执行此操作:您的估计是Long.MAX_VALUE,这是一个特殊的值,意思是“无界”。但是,如果您有许多文件具有单个 JSON 对象,导致批次大小为 1,这将在两个方面损害您的性能:打开-读取-关闭文件的开销可能会成为瓶颈,如果您设法逃脱也就是说,与处理一个项目的成本相比,线程切换的成本可能会很大,这又会造成瓶颈。

五年前我正在解决一个类似的问题,你可以看看我的解决方案

于 2019-10-29T16:02:31.297 回答
0

经过大量实验,我仍然无法通过大小估计来获得任何额外的并行性。基本上,除此之外的任何值Long.MAX_VALUE都会导致拆分器过早终止(并且没有任何拆分),而另一方面,Long.MAX_VALUE估计将导致trySplit被无情地调用,直到它返回null

我找到的解决方案是在拆分器之间内部共享资源,并让它们在彼此之间重新平衡。

工作代码:

public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {

    public final static class AwsS3LineInput<LINE> {
        final public S3ObjectSummary s3ObjectSummary;
        final public LINE lineItem;
        public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
            this.s3ObjectSummary = s3ObjectSummary;
            this.lineItem = lineItem;
        }
    }

    private final class InputStreamHandler {
        final S3ObjectSummary file;
        final InputStream inputStream;
        InputStreamHandler(S3ObjectSummary file, InputStream is) {
            this.file = file;
            this.inputStream = is;
        }
    }

    private final Iterator<S3ObjectSummary> incomingFiles;

    private final Function<S3ObjectSummary, InputStream> fileOpener;

    private final Function<InputStream, LINE> lineReader;

    private final Deque<S3ObjectSummary> unopenedFiles;

    private final Deque<InputStreamHandler> openedFiles;

    private final Deque<AwsS3LineInput<LINE>> sharedBuffer;

    private final int maxBuffer;

    private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
            Function<InputStream, LINE> lineReader,
            Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
            int maxBuffer) {
        super(Long.MAX_VALUE, 0);
        this.incomingFiles = incomingFiles;
        this.fileOpener = fileOpener;
        this.lineReader = lineReader;
        this.unopenedFiles = unopenedFiles;
        this.openedFiles = openedFiles;
        this.sharedBuffer = sharedBuffer;
        this.maxBuffer = maxBuffer;
    }

    public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
        this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
    }

    @Override
    public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
        AwsS3LineInput<LINE> lineInput;
        synchronized(sharedBuffer) {
            lineInput=sharedBuffer.poll();
        }
        if(lineInput != null) {
            action.accept(lineInput);
            return true;
        }
        InputStreamHandler handle = openedFiles.poll();
        if(handle == null) {
            S3ObjectSummary unopenedFile = unopenedFiles.poll();
            if(unopenedFile == null) {
                return false;
            }
            handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
        }
        for(int i=0; i < maxBuffer; ++i) {
            LINE line = lineReader.apply(handle.inputStream);
            if(line != null) {
                synchronized(sharedBuffer) {
                    sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
                }
            }
            else {
                return tryAdvance(action);
            }
        }
        openedFiles.addFirst(handle);
        return tryAdvance(action);
    }

    @Override
    public Spliterator<AwsS3LineInput<LINE>> trySplit() {
        synchronized(incomingFiles) {
            if (incomingFiles.hasNext()) {
                unopenedFiles.add(incomingFiles.next());
                return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
            } else {
                return null;
            }
        }
    }
}
于 2019-11-10T19:57:26.090 回答
-1

这不是您问题的直接答案。Stream但我认为在图书馆abacus-common中值得一试:

void test_58601518() throws Exception {
    final File tempDir = new File("./temp/");

    // Prepare the test files:
    //    if (!(tempDir.exists() && tempDir.isDirectory())) {
    //        tempDir.mkdirs();
    //    }
    //
    //    final Random rand = new Random();
    //    final int fileCount = 1000;
    //
    //    for (int i = 0; i < fileCount; i++) {
    //        List<String> lines = Stream.repeat(TestUtil.fill(Account.class), rand.nextInt(1000) * 100 + 1).map(it -> N.toJSON(it)).toList();
    //        IOUtil.writeLines(new File("./temp/_" + i + ".json"), lines);
    //    }

    N.println("Xmx: " + IOUtil.MAX_MEMORY_IN_MB + " MB");
    N.println("total file size: " + Stream.listFiles(tempDir).mapToLong(IOUtil::sizeOf).sum() / IOUtil.ONE_MB + " MB");

    final AtomicLong counter = new AtomicLong();
    final Consumer<Account> yourAction = it -> {
        counter.incrementAndGet();
        it.toString().replace("a", "bbb");
    };

    long startTime = System.currentTimeMillis();
    Stream.listFiles(tempDir) // the file/data source could be local file system or remote file system.
            .parallel(2) // thread number used to load the file/data and convert the lines to Java objects.
            .flatMap(f -> Stream.lines(f).map(line -> N.fromJSON(Account.class, line))) // only certain lines (less 1024) will be loaded to memory. 
            .parallel(8) // thread number used to execute your action. 
            .forEach(yourAction);

    N.println("Took: " + ((System.currentTimeMillis()) - startTime) + " ms" + " to process " + counter + " lines/objects");

    // IOUtil.deleteAllIfExists(tempDir);
}

到最后,我的笔记本电脑的 CPU 使用率非常高(大约 70%),使用 Intel(R) Core(TM) i5-8365U CPU 和 Xmx256m jvm 内存从 1000 个文件中处理 51,899,100 行/对象大约需要 70 秒. 总文件大小约为:4524 MB。如果yourAction不是繁重的操作,顺序流可能比并行流更快。

仅供参考,我是abacus-common的开发者

于 2021-03-07T21:47:22.703 回答