7

我曾经通过concatMap长时间运行的操作一次处理一个项目流。在某些时候,我需要“中断”这个长时间运行的操作,但仅限于当前项目:

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    // Now I need to interrupt whichever longRunningOperation in
    // progress, but I don't want to interrupt the whole stream.
    // In other words, I want to force it to move onto the next
    // integer.
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost done working on " + integer)

            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("\tfinally for " + integer));
}

我试图通过保留“内部”流的一次性并在正确的时间处理它来解决这个问题。但这没有用。内部流已被处理,但concatMap从未继续处理第 3 项。测试只是挂起(因为外部 observable 也从未完成/终止/处理)

Disposable disposable = Disposables.empty();

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    disposable.dispose();

    test.awaitTerminalEvent();
    System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost done working on " + integer)

            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("\tfinally for " + integer))
            .doOnSubscribe(disposable -> {
                // save disposable so we can "interrupt" later
                System.out.println("Saving disposable for " + integer);
                Example.this.disposable = disposable;
            });
}

即使这确实有效,但依靠副作用似乎有点笨拙。实现这一目标的最佳方法是什么?

4

2 回答 2

1

我有与如何在 Retrofit with RxJava 中取消单个网络请求几乎相同的问题?. 我可以使用 aPublishSubject来“打断”

private PublishSubject interrupter;

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    interrupter.onComplete();

    test.awaitTerminalEvent();
    System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
    interrupter = PublishSubject.create();

    return Observable
            .just("Start working on " + integer,
                    "Still working on " + integer,
                    "Almost done working on " + integer)
            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("Finally for " + integer))
            .takeUntil(interrupter);
}
于 2019-05-13T21:33:28.763 回答
0

您只需要实现可以触发停止事件的信号量类型的 Observable。

实现这个信号量,例如:

PublishSubject<Boolean> semaphore = PublishSubject.create()

像这样修改在 .concatMap() 中创建的 Observable:

.concatMap(string ->
                        Observable.just(string)
                                .delay(2, TimeUnit.SECONDS)
                                .takeUntil(semaphore)) //Here is the stop trigger

当您需要取消当前处理项目时 - 只需调用:

   semaphore.onNext(true);
于 2019-06-27T07:28:47.770 回答