8

寻找一种干净的方法来将源转换为在一段时间内不发射项目后Observable发射单个(或标记值)。null

例如,如果源 observable 发射1, 2, 3然后在发射前停止发射 10 秒,4, 5, 6我希望发射的项目是1, 2, 3, null, 4, 5, 6.

该用例用于在 UI 中显示值,其中显示的值应该变成破折号-,或者N/A如果最后发出的值是陈旧的/旧的。

我调查了timeout操作员,但它Observable在超时发生时终止,这是不可取的。

使用 RxJava。

4

2 回答 2

5

基于akarnokd 的回答类似问题的回答,另一种实现:

单个哨兵值(根据 OP)

如果您正在寻找一个值来指示排放之间的时间间隔:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
    .concatWith(Observable.never())
    .takeUntil(subject)
    .repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1,   0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4,   0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));

连续的哨点值

如果您希望在源 observable 一段时间内不发射后连续接收值:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
    .map(x -> -1)
    .takeUntil(subject)
    .repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1,   0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4,   0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));

区别在于timeout可观察到的以及是否重复出现。

您可以根据需要替换-1null

以上所有内容均在 RxJava 1.0.17 中使用 Java 进行了测试1.8.0_72

于 2016-04-15T01:42:38.360 回答
3

你可以通过一个稍微复杂的发布-amb-timer 设置来实现这一点:

PublishSubject<Integer> ps = PublishSubject.create();
TestScheduler s = Schedulers.test();
TestSubscriber<Integer> ts = new TestSubscriber<>();

ps.publish(o -> 
    o.take(1).ambWith(Observable.timer(10, TimeUnit.SECONDS, s).map(v -> (Integer)null))
    .repeat().takeUntil(o.ignoreElements())
).subscribe(ts);

ps.onNext(1);
ps.onNext(2);
ps.onNext(3);

s.advanceTimeBy(15, TimeUnit.SECONDS);

ps.onNext(4);
ps.onNext(5);
ps.onNext(6);
ps.onCompleted();

ts.assertValues(1, 2, 3, null, 4, 5, 6);

发生的情况是源已发布,因此您可以从中逐个获取项目或计时器事件,确保最快的一个获胜并使用下一个值重复它,而无需一直重新订阅原始源。

编辑修复了上游完成 repeat() 进入无限循环的情况。

于 2016-03-08T17:09:30.600 回答