0

我正在尝试使用 RX(Reactive extensions) 实现事件聚合器,这是我正在使用的代码,但 缺少subject.AddDisposable方法。谁能帮我?也许它是旧版本所以在新版本的 Rx 中这个方法被删除了?如果是这种情况,那么实现这一点的正确方法是什么?

 if (_observablesByTypeKey.ContainsKey(key))
            {
                Tuple<object, object> tuple = _observablesByTypeKey[key];
                stream = (IObservable<T>)tuple.Item2;
            }
            else
            {
                Type specificSubjectType = typeof(Subject<>).MakeGenericType(new[] { typeof(T) });
                var subject = (Subject<T>)Activator.CreateInstance(specificSubjectType, new object[] { });

                var removeEventStreamFromCache = Disposable.Create(
                    () =>
                        {
                            lock (_observablesByTypeKeyLock)
                            {
                                _observablesByTypeKey.Remove(key);
                            }
                        }
                    );

                stream = subject.AddDisposable(removeEventStreamFromCache).Publish().RefCount();

                var tuple = new Tuple<object, object>(subject, stream);
                _observablesByTypeKey.Add(key, tuple);
4

1 回答 1

2

你可以更换

stream = subject.AddDisposable(removeEventStreamFromCache).Publish().RefCount();

stream = Observable.Create(observer => 
  new CompositeDisposable(
    subject.Subscribe(observer),
    removeEventStreamFromCache));
stream.Publish().RefCount();
于 2012-04-24T23:53:03.130 回答