11

我正在探索反应式编程和 RxJava。这很有趣,但我被困在一个我找不到答案的问题上。我的基本问题是:终止原本无限运行的 Observable 的响应式方法是什么?我也欢迎对我的代码提出批评和反应式最佳实践。

作为练习,我正在编写一个日志文件尾部实用程序。日志文件中的行流由Observable<String>. 为了BufferedReader继续阅读添加到文件中的文本,我忽略了通常的reader.readLine() == null终止检查,而是将其解释为我的线程应该休眠并等待更多记录器文本。

但是,虽然我可以使用 终止观察者takeUntil,但我需要找到一种干净的方法来终止原本无限运行的文件观察者。我可以编写自己的terminateWatcher方法/字段,但这会破坏 Observable/Observer 封装——我希望尽可能严格地遵守反应式范式。

这是Observable<String>代码:

public class FileWatcher implements OnSubscribeFunc<String> {
    private Path path = . . .;

    @Override
    // The <? super String> generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer<? super String> observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) != null)
                    observer.onNext(newLine);
                else
                    try {
                        // Wait for more text
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
            }
            observer.onCompleted();
        } catch (Exception e) {
            observer.onError(e);
        }

        return null;  // Not sure what Subscription I should return
    }
}

这是在新行出现时打印新行的 Observer 代码:

public static void main(String... args) {
    . . .
    Observable<String> lines = Observable.create(createWatcher(file));
    lines = lines.takeWhile(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String line) {
            // Predicate for which to continue processing
            return !line.contains("shutdown");
        }
    }).subscribeOn(Schedulers.threadPoolForIO())
            .observeOn(Schedulers.currentThread());
    // Seems like I should use subscribeOn() and observeOn(), but they
    // make my tailer terminate without reading any text.

    Subscription subscription = lines.subscribe(new Action1<String>() {
        @Override
        public void call(String line) {
             System.out.printf("%20s\t%s\n", file, line);
        }
    });
}

我的两个问题是:

  1. 什么是终止原本无限运行的流的反应一致的方法?
  2. 我的代码中还有哪些其他错误让您哭泣?:)
4

1 回答 1

6

由于您在请求订阅时启动文件监视,因此在订阅结束时(即关闭时)终止它是有意义的Subscription。这解决了您的代码注释中的一个松散端:您返回一个Subscription告诉FileWatcher停止查看文件的 a。然后替换您的循环条件,使其检查订阅是否已被取消,而不是检查当前线程是否已被中断。

onSubscribe()问题是,如果您的方法永远不会返回,那将不是很有帮助。也许您FileWatcher应该要求指定一个调度程序,并在该调度程序上进行读取。

于 2013-12-18T18:48:39.497 回答