您可以对窗口进行操作,而不是保留聚合的订阅 - 这是您主要希望保持连接以使最后一个窗口到达的内容,并使用超时断开连接,以防分区时间过长。
这里使用了一个单独的类,因为 usingCreate
使它成为 Auto Detach - 在进行 dispose 调用后立即断开观察者的连接。所以从根本上说,Dispose 的含义就是这里发生了变化。
public static IObservable<T> DeferDisconnection<T>(this IObservable<T> observable, TimeSpan timeout)
{
return new ClosingObservable<T>(observable, timeout);
}
public class ClosingObservable<T> : IObservable<T>
{
private readonly IConnectableObservable<T> Source;
private readonly IDisposable Subscription;
private readonly TimeSpan Timeout;
public ClosingObservable(IObservable<T> observable, TimeSpan timeout)
{
Timeout = timeout;
Source = observable.Publish();
Subscription = Source.Connect();
}
public IDisposable Subscribe(IObserver<T> observer)
{
Source.Subscribe(observer);
return Disposable.Create(() => Source.Select(_ => new Unit())
.Amb(Observable.Timer(Timeout).Select(_ => new Unit()))
.Subscribe(_ => Subscription.Dispose())
);
}
}
测试:
var disposable =
Observable.Interval(TimeSpan.FromSeconds(2))
.Do(Console.WriteLine)
.DeferDisconnection(TimeSpan.FromSeconds(5))
.Subscribe();
Console.ReadLine();
disposable.Dispose();
Console.ReadLine();