1

需要一些关于 RX 的帮助。我想定义 observable,它应该在创建第一个订阅时创建资源,为每个新订阅发布一次此资源实例,并且当所有订阅完成后,必须处置该资源实例。类似于 Observable.Using 的东西,但具有 Publish(value) 和 RefCount 行为。我所有使用标准运算符表达它的尝试都失败了。这段代码可以满足我的要求,但我认为必须有标准的方式来做到这一点。我真的不想重新发明轮子。

using System;
using System.Reactive.Linq;
using System.Reactive.Disposables;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main()
        {
            // this part is what i can't express in standart RX operators..
            Res res = null;
            RefCountDisposable disp = null;
            var @using = Observable.Create<Res>(obs =>
                {
                    res = res ?? new Res();
                    disp = disp == null || disp.IsDisposed ? new RefCountDisposable(res) : disp;
                    obs.OnNext(res);
                    return new CompositeDisposable(disp.GetDisposable(), disp, Disposable.Create(() => res = null));
                });
            // end
            var sub1 = @using.Subscribe(Print);
            var sub2 = @using.Subscribe(Print);

            sub1.Dispose();
            sub2.Dispose();

            sub1 = @using.Subscribe(Print);
            sub2 = @using.Subscribe(Print);

            sub1.Dispose();
            sub2.Dispose();
            Console.ReadKey();
        }

        static void Print(object o)
        {
            Console.WriteLine(o.GetHashCode());
        }

    }
    class Res : IDisposable
    {
        public Res()
        {
            Console.WriteLine("CREATED");
        }

        public void Dispose()
        {
            Console.WriteLine("DISPOSED");
        }
    }

}

输出:

CREATED
1111
1111
DISPOSED
CREATED
2222
2222
DISPOSED

我对标准运营商的“最佳”尝试:

var @using = Observable.Using(() => new Res(), res => Observable.Never(res).StartWith(res))
     .Replay(1)
     .RefCount();

输出是:

CREATED
1111
1111
DISPOSED
CREATED
1111  <-- this is "wrong" value
2222
2222
DISPOSED

谢谢!

附言。对不起我糟糕的英语=(

4

3 回答 3

2

在有点头痛之后,我终于意识到问题Using.Replay.RefCount在于Replay内部调用Multicast单个ReplaySubject实例,但在我的具体情况下,我需要在每个新的第一次订阅Replay重新创建主题。通过谷歌我找到了 RXX 库,这ReconnectableObservable就是答案。它使用主题工厂而不是主题实例在每次Connect调用中重新创建主题(原始 rxx 代码,根本没有合同):

internal sealed class ReconnectableObservable<TSource, TResult> : IConnectableObservable<TResult>
{
    private ISubject<TSource, TResult> Subject
    {
        get { return _subject ?? (_subject = _factory()); }
    }

    private readonly object _gate = new object();
    private readonly IObservable<TSource> _source;
    private readonly Func<ISubject<TSource, TResult>> _factory;

    private ISubject<TSource, TResult> _subject;
    private IDisposable _subscription;

    public ReconnectableObservable(IObservable<TSource> source, Func<ISubject<TSource, TResult>> factory)
    {
        _source = source;
        _factory = factory;
    }

    public IDisposable Connect()
    {
        lock (_gate)
        {
            if (_subscription != null)
                return _subscription;

            _subscription = new CompositeDisposable(
                _source.Subscribe(Subject),
                Disposable.Create(() =>
                {
                    lock (_gate)
                    {
                        _subscription = null;
                        _subject = null;
                    }
                }));

            return _subscription;
        }
    }

    public IDisposable Subscribe(IObserver<TResult> observer)
    {
        lock (_gate)
        {
            return Subject.Subscribe(observer);
        }
    }
}

和一些扩展方法:

public static class Ext
{
    public static IConnectableObservable<T> Multicast<T>(this IObservable<T> obs, Func<ISubject<T>> subjectFactory)
    {
        return new ReconnectableObservable<T, T>(obs, subjectFactory);
    }

    public static IConnectableObservable<T> ReplayReconnect<T>(this IObservable<T> obs, int replayCount)
    {
        return obs.Multicast(() => new ReplaySubject<T>(replayCount));
    }

    public static IConnectableObservable<T> PublishReconnect<T>(this IObservable<T> obs)
    {
        return obs.Multicast(() => new Subject<T>());
    }
}

使用该代码,现在我可以这样做:

var @using = Observable
             .Using(() => new Res(), _ => Observable.Never(_).StartWith(_))
             .ReplayReconnect(1) // <-- that's it!
             .RefCount();

雅虎!它按预期工作。

感谢所有回答的人!你把我推向了正确的方向。

于 2013-10-17T02:49:12.323 回答
0

如果有办法使用标准运算符做到这一点,我看不到它。

问题是标准运算符中没有“当有订阅者时才缓存值”选项。

无论订阅者如何,重播运算符都会缓存最后一个值,这是您看到的“错误”值的根本原因。

它强调了 Using + Replay 是一个危险的组合,因为它发出了一个处置值。

我怀疑如果有人确实使用标准运算符管理了一些魔法,它就不会像您的 Observable.Create 实现那样可读。

我已经多次使用 Observable.Create 来创建我确信比使用标准运算符的等效构造更简洁、可读和可维护的代码。

我的建议是使用 Observable.Create 绝对没有任何问题 - 将您的代码包装在一个接受资源的好工厂方法中,您就可以开始了。这是我的尝试,它只是对代码的重新打包,添加了线程安全:

public static IObservable<T> CreateObservableRefCountedResource<T>(Func<T> resourceFactory)
    where T : class, IDisposable
{
    T resource = null;
    RefCountDisposable resourceDisposable = null;
    var gate = new object();

    return Observable.Create<T>(o =>
        {
            lock (gate)
            {
                resource = resource ?? resourceFactory();
                var disposeAction = Disposable.Create(() =>
                    {
                        lock (gate)
                        {
                            resource.Dispose();
                            resource = null;
                        }
                    });

                resourceDisposable = (resourceDisposable == null || resourceDisposable.IsDisposed)
                                         ? new RefCountDisposable(disposeAction)
                                         : resourceDisposable;

                o.OnNext(resource);
                return new CompositeDisposable(
                    resourceDisposable,
                    resourceDisposable.GetDisposable());
            }
        });
}

已编辑 - 忘记调用 resourceDisposable.GetDisposable()!

于 2013-10-16T08:04:32.887 回答
0

尝试这个:

var @using = Observable.Using(
        () => new Res(),
        res => Observable.Return(res).Concat(Observable.Never<Res>()))
    .Publish((Res)null)
    .RefCount()
    .SkipWhile(res => res == null);

Concat防止观察者在可观察对象产生其唯一值时自动取消订阅。

于 2013-10-15T16:33:45.010 回答