我有一个类,它接收一个事件流,然后推出另一个事件流。
所有事件都使用响应式扩展 (RX)。传入的事件流从外部源推送到IObserver<T>
using .OnNext
,而传出的事件流使用IObservable<T>
and推送出去.Subscribe
。我Subject<T>
用来管理这个,在幕后。
我想知道 RX 中有什么技术可以暂时暂停输出。这意味着传入事件将在内部队列中建立,当它们被取消暂停时,事件将再次流出。
我有一个类,它接收一个事件流,然后推出另一个事件流。
所有事件都使用响应式扩展 (RX)。传入的事件流从外部源推送到IObserver<T>
using .OnNext
,而传出的事件流使用IObservable<T>
and推送出去.Subscribe
。我Subject<T>
用来管理这个,在幕后。
我想知道 RX 中有什么技术可以暂时暂停输出。这意味着传入事件将在内部队列中建立,当它们被取消暂停时,事件将再次流出。
这是我使用 Buffer 和 Window 运算符的解决方案:
public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser)
{
var queue = source.Buffer(pauser.Where(toPause => !toPause),
_ => pauser.Where(toPause => toPause))
.SelectMany(l => l.ToObservable());
return source.Window(pauser.Where(toPause => toPause).StartWith(true),
_ => pauser.Where(toPause => !toPause))
.Switch()
.Merge(queue);
}
窗口在订阅时打开,每次从暂停流接收到“真”时。当 pauser 提供 'false' 值时,它会关闭。
Buffer 做了它应该做的事情,缓冲来自暂停器的 'false' 和 'true' 之间的值。一旦 Buffer 接收到“真”,它就会输出 IList 值,这些值会立即立即流式传输。
DotNetFiddle 链接:https ://dotnetfiddle.net/vGU5dJ
这是一个相当简单的 Rx 方法来做你想做的事。我创建了一个名为的扩展方法Pausable
,它采用源 observable 和第二个 observableboolean
来暂停或恢复 observable。
public static IObservable<T> Pausable<T>(
this IObservable<T> source,
IObservable<bool> pauser)
{
return Observable.Create<T>(o =>
{
var paused = new SerialDisposable();
var subscription = Observable.Publish(source, ps =>
{
var values = new ReplaySubject<T>();
Func<bool, IObservable<T>> switcher = b =>
{
if (b)
{
values.Dispose();
values = new ReplaySubject<T>();
paused.Disposable = ps.Subscribe(values);
return Observable.Empty<T>();
}
else
{
return values.Concat(ps);
}
};
return pauser.StartWith(false).DistinctUntilChanged()
.Select(p => switcher(p))
.Switch();
}).Subscribe(o);
return new CompositeDisposable(subscription, paused);
});
}
它可以这样使用:
var xs = Observable.Generate(
0,
x => x < 100,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(0.1));
var bs = new Subject<bool>();
var pxs = xs.Pausable(bs);
pxs.Subscribe(x => { /* Do stuff */ });
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
现在,我唯一不能完全理解您所说的“传入事件流是一个IObserver<T>
”是什么意思。流是IObservable<T>
. 观察者不是流。听起来你在这里没有做某事。你能补充你的问题并进一步解释吗?
您可以使用Observable
.
一旦您的 pauseObservable 发出“暂停”值,缓冲事件直到 pauseObservable 发出“未暂停”值。
这是一个使用Dave Sexton 的 BufferUntil 实现和Timothy Shields 的 Observable 逻辑的示例(来自我自己的问题)
//Input events, hot observable
var source = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(i => i.ToString())
.Publish().RefCount();
//Simulate pausing from Keyboard, not actually relevant within this answer
var pauseObservable = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
k => KeyPressed += k, k => KeyPressed -= k)
.Select(i => i.EventArgs.PressedKey)
.Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause
.DistinctUntilChanged();
//Let events through when not paused
var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused})
.Where(i => !i.paused) //not paused
.Select(i => i.value)
.Subscribe(Console.WriteLine);
//When paused, buffer until not paused
var pausedEvents = pauseObservable.Where(i => i)
.Subscribe(_ =>
source.BufferUntil(pauseObservable.Where(i => !i))
.Select(i => String.Join(Environment.NewLine, i))
.Subscribe(Console.WriteLine));
改进空间:可能将两个订阅源(pausedEvents 和 notPausedEvents)合并为一个。