我正在使用 RX,我想将源流绑定/映射到目标流,以便可以动态更改源流而不影响对目标流的任何订阅。
我将在这里布置我的(天真的)解决方案,希望有人可以向我展示更好的解决方案。
我希望有现有的扩展方法可以组合来实现这个结果。如果不是,我希望制作一个自定义扩展方法来简化我的解决方案。
/// <summary>
/// Used to bind a source stream to destination stream
/// Clients can subscribe to the destination stream before the source stream has been bound.
/// The source stream can be changed as desired without affecting the subscription to the destination stream.
/// </summary>
public class BindableStream<T>
{
/// <summary>
/// The source stream that is only set when we bind it.
/// </summary>
private IObservable<T> sourceStream;
/// <summary>
/// Used to unsubscribe from the source stream.
/// </summary>
private IDisposable sourceStreamDisposer;
/// <summary>
/// Subject used as the destination stream.
/// For passing data from source to dest stream.
/// </summary>
private Subject<T> destStream = new Subject<T>();
/// <summary>
/// Get the destination stream. Clients can subscribe to this to receive data that is passed on from the source stream.
/// Later on we can set or change the underlying source stream without affecting the destination stream.
/// </summary>
public IObservable<T> GetDestStream()
{
return destStream;
}
/// <summary>
/// Bind to a source stream that is to be propagated to the destination stream.
/// </summary>
public void Bind(IObservable<T> sourceStream)
{
Unbind();
this.sourceStream = sourceStream;
this.sourceStreamDisposer = sourceStream.Subscribe(dataItem =>
{
//
// Pass the source item on to the client via the subject.
//
destStream.OnNext(dataItem);
});
}
/// <summary>
/// Unsubscribe from the source stream.
/// </summary>
public void Unbind()
{
if (sourceStreamDisposer != null)
{
sourceStreamDisposer.Dispose();
}
sourceStreamDisposer = null;
sourceStream = null;
}
}
这是一个如何使用它的非常简单的示例:
static void Main(string[] args)
{
var bindableStream = new BindableStream<long>();
// Subscribe before binding the source stream.
bindableStream.GetDestStream().Subscribe(i => Console.WriteLine(i));
Thread.Sleep(1000);
// Bind a source stream.
bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));
Thread.Sleep(5000);
// Bind a new source stream.
bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.ReadKey();
}