2

我有一个可观察的事件对象序列和一些处理特定类型事件的观察者。我需要完成以下场景:

  1. 某些事件类型需要由匹配条件的第一个观察者处理(例如 observable.SubscribeExclusively(x=>{}) 并且对其他事件类型变得“不可观察”。
  2. 如果没有订阅,请设置一些默认处理程序(例如 observable.SubscribeIfNoSubscriptions(x=>{})),以便不会丢失任何项目(例如,此处理程序可以将项目保存到数据库以便稍后处理)。

有没有办法用 Rx 处理这些情况?

4

2 回答 2

1

“排他性”更容易 - 您只需让其他人订阅排他观察者的过滤输出。

“默认”更难 - RX 编程是一种函数式编程,订阅者彼此不知道,而根据定义,拥有“默认”订阅者意味着在观察者之间共享一些状态。拥有共享状态的一种方法是使用来自TPL DataFlow的ConcurrentBag或 BufferBlock创建生产者/消费者队列。另一种方法是使用这样的类将“已处理”状态附加到事件本身:

public class Handled<T>
{
    public bool IsHandled { get; set; }
    public T Data { get; set; }
}

无论如何,在使用“默认”处理程序之前,您必须给观察者一些时间做出反应。下面的代码说明了“独占”和“默认”的概念:

        var source = new[] {0, 1, 2, 3, 4}.ToObservable();
        var afterExclusive = source
            .Where(x =>
                       {
                           if (x == 0)
                           {
                               Console.WriteLine("exclusive");
                               return false;
                           }
                           return true;
                       })
            .Select(x => new Handled<int> {Data = x})
            .Publish(); // publish is a must otherwise 
        afterExclusive  // we'll get non shared objects
            .Do(x => { x.IsHandled = true; })
            .Subscribe();
        afterExclusive
            .Delay(TimeSpan.FromSeconds(1))
            .Where(x => !x.IsHandled)
            .Subscribe(x => Console.WriteLine("missed by all {0}", x));
        afterExclusive.Connect(); 
于 2011-08-03T04:32:01.400 回答
0

我不确定我是否完全了解您的情况,但这对您有何影响:

IObservable<Event> streamOfEvents.SelectMany(x => {
    if (matchesExclusiveItem1(x)) {
        x += exclusiveItem1Handler;
        return Observable.Empty<Event>();
    }

    // Default case
    return Observable.Return(x);
}).Subscribe(x => {
    // Default case
    x += defaultHandler;
});

我正在使用“事件对象”,因为这是您指定的,但使用它可能会更好IObservable<IObservable<T>>- 这个选择器有副作用(连接事件),这不太好。

于 2011-07-22T20:16:02.143 回答