“排他性”更容易 - 您只需让其他人订阅排他观察者的过滤输出。
“默认”更难 - RX 编程是一种函数式编程,订阅者彼此不知道,而根据定义,拥有“默认”订阅者意味着在观察者之间共享一些状态。拥有共享状态的一种方法是使用来自TPL DataFlow的ConcurrentBag或 BufferBlock创建生产者/消费者队列。另一种方法是使用这样的类将“已处理”状态附加到事件本身:
public class Handled<T>
{
public bool IsHandled { get; set; }
public T Data { get; set; }
}
无论如何,在使用“默认”处理程序之前,您必须给观察者一些时间做出反应。下面的代码说明了“独占”和“默认”的概念:
var source = new[] {0, 1, 2, 3, 4}.ToObservable();
var afterExclusive = source
.Where(x =>
{
if (x == 0)
{
Console.WriteLine("exclusive");
return false;
}
return true;
})
.Select(x => new Handled<int> {Data = x})
.Publish(); // publish is a must otherwise
afterExclusive // we'll get non shared objects
.Do(x => { x.IsHandled = true; })
.Subscribe();
afterExclusive
.Delay(TimeSpan.FromSeconds(1))
.Where(x => !x.IsHandled)
.Subscribe(x => Console.WriteLine("missed by all {0}", x));
afterExclusive.Connect();