1

我正在从文件中读取内容,我用换行符将一些名称放入文件中,但是在通过 vertx 文件系统读取文件时,我无法提供所需的过滤器。每次打印文件的所有数据。

这是代码片段:-

vertx.fileSystem().open("data.txt", new OpenOptions(), handler -> {
                final AsyncFile asyncFile = handler.result();
                final Observable<Buffer> observable = asyncFile.toObservable();
                observable.subscribe(item -> {
                    final String[] split = item.toString().split("\n\r");
                    List<String> list = Arrays.asList(split);
                    final Observable<String> stringObservable = Observable.fromIterable(list);
                    stringObservable
                            .filter(name -> name.toString().startsWith("R"))
                            .take(2)
                            .subscribe(str -> System.out.println(str), err -> System.out.println(err), () -> System.out.println("Inner loop completed"));


                }, error -> System.out.println(error), () -> System.out.println("Completed !!!"));
            });

在我发现上述 observable 一口气拥有文件的所有数据后,我使用了内部 observable。

4

1 回答 1

3

如果我在“\n”而不是“\n\r”上拆分,这个代码片段实际上对我来说很好。如果您在打印所有内容时遇到问题,我的猜测是整个文件中的第一个字符是“R”,然后文件中的任何地方实际上都没有“\n\r”。因此,当您尝试拆分时,您最终会得到一个包含整个文件的大字符串。

也就是说,您可以通过使用rxOpen代替来显着简化此代码open,避免订阅嵌套,并简化如何将Buffer转换为Observable发出Buffer.

另请注意,我曾经RecordParser使用“\n”字符作为分隔符将文件的内容拆分为标记。直接AsyncFile变成Observable<Buffer>usingtoObservable()可能会导致Buffers 在一行中途中断,这会搞砸你的解析。

所以把它们放在一起:

vertx.fileSystem().rxOpen("data.txt", new OpenOptions())
    .flatMapObservable(af -> RecordParser.newDelimited("\n", af).toObservable())
    .map(Buffer::toString)
    .filter(name -> name.startsWith("R"))
    .take(2)
    .subscribe(System.out::println, System.err::println, () -> System.out.println("Completed"));

你从 13 行变为 6 行。

于 2020-07-31T16:50:06.290 回答