每当流 B 触发时,我想停止流 A 以获得一个通知。两个流都将保持在线并且永远不会完成。
A: o--o--o--o--o--o--o--o--o
B: --o-----o--------o-------
R: o-----o-----o--o-----o--o
或者
A: o--o--o--o--o--o--o--o--o
B: -oo----oo-------oo-------
R: o-----o-----o--o-----o--o
每当流 B 触发时,我想停止流 A 以获得一个通知。两个流都将保持在线并且永远不会完成。
A: o--o--o--o--o--o--o--o--o
B: --o-----o--------o-------
R: o-----o-----o--o-----o--o
或者
A: o--o--o--o--o--o--o--o--o
B: -oo----oo-------oo-------
R: o-----o-----o--o-----o--o
当 observable 很热(并且没有refCount
)时,此解决方案将起作用:
streamA
.takeUntil(streamB)
.skip(1)
.repeat()
.merge(streamA.take(1))
.subscribe(console.log);
.takeUntil(streamB)
:在流产生值时使流A
完成。B
.skip(1)
: 使流A
在开始时跳过一个值(或作为 的结果.repeat()
)。.repeat()
:使流A
无限重复(重新连接)。.merge(streamA.take(1))
: 偏移.skip(1)
流开始处的效果。使 A 流每 5 秒跳过一次的示例:
var streamA,
streamB;
streamA = Rx.Observable
.interval(1000)
.map(function (x) {
return 'A:' + x;
}).publish();
streamB = Rx.Observable
.interval(5000);
streamA
.takeUntil(streamB)
.skip(1)
.repeat()
.merge(streamA.take(1))
.subscribe(console.log);
streamA.connect();
也可以使用这个沙箱http://jsbin.com/gijorid/4/edit?js,consoleBACTION()
在运行代码的时候在控制台日志中执行手动push一个值到(streamB
有助于分析代码)。
这是我为类似问题SkipWhen
所做的运算符的一个版本(不同之处在于,在原始版本中,多个“B”会跳过多个“A”):
public static IObservable<TSource> SkipWhen<TSource, TOther>(this IObservable<TSource> source,
IObservable<TOther> other)
{
return Observable.Create<TSource>(observer =>
{
object lockObject = new object();
bool shouldSkip = false;
var otherSubscription = new MutableDisposable();
var sourceSubscription = new MutableDisposable();
otherSubscription.Disposable = other.Subscribe(
x => { lock(lockObject) { shouldSkip = true; } });
sourceSubscription.Disposable = source.Where(_ =>
{
lock(lockObject)
{
if (shouldSkip)
{
shouldSkip = false;
return false;
}
else
{
return true;
}
}
}).Subscribe(observer);
return new CompositeDisposable(
sourceSubscription, otherSubscription);
});
}
如果当前实现成为瓶颈,请考虑将锁实现更改为使用ReaderWriterLockSlim
.