7

我有一个IObservable<IObservable<T>>每个内部IObservable<T>都是一个值流,然后是一个最终OnCompleted事件。

我想将其转换为IObservable<IEnumerable<T>>, 一个由任何未完成的内部流中的最新值组成的流。IEnumerable<T>每当从内部流之一产生新值(或内部流到期)时,它都应该产生一个新值

它最容易用大理石图显示(我希望它足够全面):

input ---.----.---.----------------
         |    |   '-f-----g-|      
         |    'd------e---------|
         'a--b----c-----|          

result ---a--b-b--c-c-c-e-e-e---[]-
               d  d d e f g        
                    f f            

([]为空IEnumerable<T>-|代表 OnCompleted)

你可以看到它有点像一个CombineLatest操作。我一直在玩,JoinGroupJoin无济于事,但我觉得这几乎可以肯定是前进的正确方向。

我想在这个运算符中使用尽可能少的状态。

更新

我已经更新了这个问题,不仅包括单值序列——结果IObservable<IEnumerable<T>>应该只包括每个序列的最新值——如果一个序列没有产生一个值,它不应该被包括在内。

4

3 回答 3

3

这是昨天基于您的解决方案的版本,针对新要求进行了调整。基本思想是将引用放入易腐烂的集合中,然后在内部序列产生新值时更新引用的值。

我还进行了修改以正确跟踪内部订阅并在外部 observable 取消订阅时取消订阅。

如果任何流产生错误,也进行了修改以将其全部拆除。

最后,我修复了一些可能违反 Rx 指南的竞争条件。如果您的内部 observables 从不同的线程同时触发,您可能会obs.OnNext同时结束调用,这是一个很大的禁忌。因此,我使用相同的锁对每个内部 observable 进行了门控以防止这种情况发生(参见Synchronize调用)。请注意,正因为如此,您可能会使用常规的双链表而PerishableCollection不是PerishableCollection.

// Acts as a reference to the current value stored in the list
private class BoxedValue<T>
{
    public T Value;
    public BoxedValue(T initialValue) { Value = initialValue; }
}

public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Create<IEnumerable<T>>(obs =>
    {
        var collection = new PerishableCollection<BoxedValue<T>>();
        var outerSubscription = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable(outerSubscription);
        var innerLock = new object();

        outerSubscription.Disposable = source.Subscribe(duration =>
        {
            BoxedValue<T> value = null;
            var lifetime = new DisposableLifetime(); // essentially a CancellationToken
            var subscription = new SingleAssignmentDisposable();

            subscriptions.Add(subscription);
            subscription.Disposable = duration.Synchronize(innerLock)
                .Subscribe(
                    x =>
                    {
                        if (value == null)
                        {
                            value = new BoxedValue<T>(x);
                            collection.Add(value, lifetime.Lifetime);
                        }
                        else
                        {
                            value.Value = x;
                        }
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                    },
                    obs.OnError, // handle an error in the stream.
                    () => // on complete
                    {
                        if (value != null)
                        {
                            lifetime.Dispose(); // removes the item
                            obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                            subscriptions.Remove(subscription); // remove this subscription
                        }
                    }
            );
        });

        return subscriptions;
    });
}
于 2013-07-29T22:27:00.720 回答
0

此解决方案适用于单项流,但不幸的是会将每个项累积到内部流中,直到完成。

public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Create<IEnumerable<T>>(obs =>
    {
        var collection = new PerishableCollection<T>();
        return source.Subscribe(duration =>
        {
            var lifetime = new DisposableLifetime(); // essentially a CancellationToken
            duration
                .Subscribe(
                    x => // on initial item
                    {
                        collection.Add(x, lifetime.Lifetime);
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value));
                    },
                    () => // on complete
                    {
                        lifetime.Dispose(); // removes the item
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value));
                    }
            );
        });
    });
}
于 2013-07-28T23:26:00.603 回答
0

Rxx的创建者Dave Sexton 给出的另一个解决方案- 它使用Rxx.CombineLatest,它的实现似乎与 Brandon 的解决方案非常相似:

public static IObservable<IEnumerable<T>> CombineLatestEagerly<T>(this IObservable<IObservable<T>> source)
{
  return source
    // Reify completion to force an additional combination:
    .Select(o => o.Select(v => new { Value = v, HasValue = true })
                  .Concat(Observable.Return(new { Value = default(T), HasValue = false })))
    // Merge a completed observable to force combination with the first real inner observable:
    .Merge(Observable.Return(Observable.Return(new { Value = default(T), HasValue = false })))
    .CombineLatest()
    // Filter out completion notifications:
    .Select(l => l.Where(v => v.HasValue).Select(v => v.Value));
}
于 2013-07-31T10:59:57.927 回答