4

我正在使用 RX,我想将源流绑定/映射到目标流,以便可以动态更改源流而不影响对目标流的任何订阅。

我将在这里布置我的(天真的)解决方案,希望有人可以向我展示更好的解决方案。

我希望有现有的扩展方法可以组合来实现这个结果。如果不是,我希望制作一个自定义扩展方法来简化我的解决方案。

/// <summary>
/// Used to bind a source stream to destination stream
/// Clients can subscribe to the destination stream before the source stream has been bound.
/// The source stream can be changed as desired without affecting the subscription to the destination stream.
/// </summary>
public class BindableStream<T>
{
    /// <summary>
    /// The source stream that is only set when we bind it.
    /// </summary>
    private IObservable<T> sourceStream;

    /// <summary>
    /// Used to unsubscribe from the source stream.
    /// </summary>
    private IDisposable sourceStreamDisposer;

    /// <summary>
    /// Subject used as the destination stream.
    /// For passing data from source to dest stream.
    /// </summary>
    private Subject<T> destStream = new Subject<T>();

    /// <summary>
    /// Get the destination stream. Clients can subscribe to this to receive data that is passed on from the source stream.
    /// Later on we can set or change the underlying source stream without affecting the destination stream.
    /// </summary>
    public IObservable<T> GetDestStream()
    {
        return destStream;
    }

    /// <summary>
    /// Bind to a source stream that is to be propagated to the destination stream.
    /// </summary>
    public void Bind(IObservable<T> sourceStream)
    {
        Unbind();

        this.sourceStream = sourceStream;
        this.sourceStreamDisposer = sourceStream.Subscribe(dataItem =>
        {
            //
            // Pass the source item on to the client via the subject.
            //
            destStream.OnNext(dataItem);
        });
    }

    /// <summary>
    /// Unsubscribe from the source stream.
    /// </summary>
    public void Unbind()
    {
        if (sourceStreamDisposer != null)
        {
            sourceStreamDisposer.Dispose();
        }

        sourceStreamDisposer = null;
        sourceStream = null;
    }

}

这是一个如何使用它的非常简单的示例:

static void Main(string[] args)
{
    var bindableStream = new BindableStream<long>();

    // Subscribe before binding the source stream.
    bindableStream.GetDestStream().Subscribe(i => Console.WriteLine(i));

    Thread.Sleep(1000);

    // Bind a source stream.
    bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));

    Thread.Sleep(5000);

    // Bind a new source stream.
    bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));

    Console.ReadKey();
}
4

2 回答 2

4

你可以使用Observable.Switch(...)操作符来得到你想要的。

Switch 创建一个“滚动”订阅。当一个新的 observable 产生时,它会处理它对前一个 observable 的订阅,并订阅新的 observable。

static void Main(string[] args)
{
    var streams = new Subject<IObservable<long>>();

    // Subscribe before binding the source stream.
    streams.Switch().Subscribe(Console.WriteLine);

    Thread.Sleep(1000);

    // Bind a source stream.
    streams.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));

    Thread.Sleep(5000);

    // Bind a new source stream.
    streams.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));

    Console.ReadKey();
}

或者,如果你知道你的“流”来自哪里......

static void Main(string[] args)
{
    var interval = Observable.IntervalTimeSpan.FromSeconds(1));

    var sourcesOvertime = new [] {
        // Yield the first source after one second
        Observable.Return(interval).Delay(TimeSpan.FromSeconds(1)),
        // Yield the second source after five seconds
        Observable.Return(interval).Delay(TimeSpan.FromSeconds(5))
    };

    sourcesOvertime
        // merge these together so we end up with a "stream" of our source observables
        .Merge()
        // Now only listen to the latest one.
        .SwitchLatest()
        // Feed the values from the latest source to the console.
        .Subscribe(Console.WriteLine);

    Console.ReadKey();
}

编辑:

作为BindableStream类的简化...

static void Main(string[] args)
{
    // var bindableStream = new BindableStream<long>();
    var bindableStream = new Subject<IObservable<long>>();
    var dest = bindableStream.Switch();

    // Subscribe before binding the source stream.
    // bindableStream.Subscribe(i => Console.WriteLine(i));
    dest.Subscribe(i => Console.WriteLine(i));

    Thread.Sleep(1000);

    // Bind a source stream.
    // bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));
    bindableStream.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));

    Thread.Sleep(5000);

    // Bind a new source stream.
    // bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));
    bindableStream.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));

    Thread.Sleep(4000);

    Console.WriteLine("Unbound!");

    // Unbind the source and dest streams.
    // bindableStream.Unbind();
    bindableStream.OnNext(Observable.Empty<long>());

    Console.ReadKey();
}

或者如果这太冗长......

public static class SubjectEx
{
    public static class OnNextEmpty<T>(this ISubject<IObservable<T>> subject)
   {
       subject.OnNext(Observable.Empty<T>());
   }
}
于 2014-08-03T03:14:51.257 回答
1

在@ChristopherHarris 输入后,我修改了我原来的解决方案。我认为这比我原来的示例要好得多,尽管我仍然希望能够将其归结为自定义扩展方法。

如果您能弄清楚如何简化此问题,请发布答案。

注意:使用 Switch 简化了我原来的解决方案,并且无需手动订阅源序列。

/// <summary>
/// Used to bind a source stream to destination stream
/// Clients can subscribe to the destination stream before the source stream has been bound.
/// The source stream can be changed as desired without affecting the subscription to the destination stream.
/// </summary>
public class BindableStream<T> : IObservable<T>
{
    /// <summary>
    /// Subject used as the destination stream.
    /// For passing data from source to dest stream.
    /// This is a stream of streams.
    /// When a new stream is added it replaces whichever stream was previously added.
    /// </summary>
    private Subject<IObservable<T>> destStream = new Subject<IObservable<T>>();

    /// <summary>
    /// Subscribe to the destination stream.
    /// Clients can subscribe to this to receive data that is passed on from the source stream.
    /// Later on we can set or change the underlying source stream without affecting the destination stream.
    /// </summary>
    public IDisposable Subscribe(IObserver<T> observer)
    {
        return destStream.Switch().Subscribe(observer);
    }

    /// <summary>
    /// Bind to a new source stream that is to be propagated to the destination stream.
    /// </summary>
    public void Bind(IObservable<T> sourceStream)
    {
        destStream.OnNext(sourceStream);
    }

    /// <summary>
    /// Unbind the source stream.
    /// </summary>
    public void Unbind()
    {
        destStream.OnNext(Observable.Empty<T>());
    }
}

使用 'BindableStream' 的示例:

static void Main(string[] args)
{
    var bindableStream = new BindableStream<long>();

    // Subscribe before binding the source stream.
    bindableStream.Subscribe(i => Console.WriteLine(i));

    Thread.Sleep(1000);

    // Bind a source stream.
    bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));

    Thread.Sleep(5000);

    // Bind a new source stream.
    bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));

    Thread.Sleep(4000);

    Console.WriteLine("Unbound!");

    // Unbind the source and dest streams.
    bindableStream.Unbind();

    Console.ReadKey();
}
于 2014-08-03T08:11:05.543 回答