5

使用 Rx,获取 Subject 中当前观察者数量的最佳方法是什么?

我有一个场景,我想发布一条消息,但前提是有观察者。如果没有观察者,我需要做其他事情。

为了解决这个问题,我所做的是创建自己的 ISubject 实现并公开内部 IObserver 集合的计数。我确信必须有一种开箱即用的方式来做到这一点,我只是不完全熟悉 Rx 所提供的功能。

谢谢!

4

3 回答 3

9

使用该Subject<T>.HasObservers物业。

源代码

我不记得它是什么时候引入的,但我很确定它并不总是在那里。它可能是在 Rx 2.0 中添加的。

于 2014-09-09T03:44:14.737 回答
5

您应该尽可能避免实现自己的可观察(或主题)实现。

您当然可以尝试编写一个包装类来提供帮助。

尝试这个:

public class Countable
{
    private int _count;
    public int Count { get { return _count; } }
    public IObservable<T> GetCountable<T>(IObservable<T> source)
    {
        return Observable.Create<T>(o =>
        {
            Interlocked.Increment(ref _count);
            var subscription = source.Subscribe(o);
            var decrement = Disposable.Create(() =>
            {
                Interlocked.Decrement(ref _count);
            });
            return new CompositeDisposable(subscription, decrement);
        });
    }
}

然后,您可以编写如下代码:

var xs = new Subject<int>();
var countable = new Countable();
var ys = countable.GetCountable(xs);
Console.WriteLine(countable.Count);
var s1 = ys.Subscribe(y => { });
Console.WriteLine(countable.Count);
var s2 = ys.Subscribe(y => { });
Console.WriteLine(countable.Count);
s1.Dispose();
Console.WriteLine(countable.Count);
s2.Dispose();
Console.WriteLine(countable.Count);

我的运行结果是:

0
1
2
1
0
于 2012-11-30T00:52:06.477 回答
4

使用 subject.observers.length,例如:

import {Subject} from 'rxjs'

let subject = new Subject()
let s1 = subject.subscribe(v => console.log('observerA: ' + v))

subject.next(1) // observerA: 1
console.log(subject.observers.length) // 1


let s2 = subject.subscribe(v => {
    console.log('observerB: ' + v)
    if(v===3) s2.unsubscribe()
})

subject.next(2) // observerA: 2
console.log(subject.observers.length) // 2

subject.next(3) // observerA: 3
console.log(subject.observers.length) // 1
于 2017-05-06T08:39:36.037 回答