2

我有一个可观察的消息序列。有一组订阅者可以处理这些消息。每个订阅者都有一个执行优先级。每条消息必须由从当前订阅的订阅者列表中选择的最高优先级订阅者处理一次。订阅者不断地订阅/取消订阅序列,因此我们在构建序列时不知道订阅者的数量和优先级。使用 rx 是一种可能/可行的解决方案吗?

为了显示:

public class Message
{
    public string Value { get; set; }
    public bool IsConsumed { get; set; }
}

var subject = new Subject<Message>();
var sequence = subject.Publish().RefCount();

Action<Message, int> subscriber = (m, priority) =>
{
    if (!m.IsConsumed)
    {
        m.IsConsumed = true;
        Trace.WriteLine(priority);
    }
};

var s2 = sequence.Priority(2).Subscribe(m => subscriber(m, 2));
var s1 = sequence.Priority(1).Subscribe(m => subscriber(m, 1));

subject.OnNext(new Message()); // output: 1

s1.Dispose();
subject.OnNext(new Message()); // output: 2

使该解决方案起作用的缺失部分是 Rx 库中不存在的 Priority 方法。

4

1 回答 1

2

这是一个非常有趣的问题......

所以,首先:我不知道有任何内在的 Rx 运算符可以实现类似于您在此Priority扩展中想要的“路由”效果。

也就是说,我今天午餐时在 LINQPad 上玩耍,并想出了一个(非常)hacky 的概念证明,它似乎有效:

一、你的消息类

public class Message
{
    public string Value { get; set; }
    public bool IsConsumed { get; set; }
}

接下来,扩展方法wrapper-class:

public static class Ext
{    
    public static PrioritizedObservable<T> Prioritize<T>(this IObservable<T> source)
    {
        return new PrioritizedObservable<T>(source);
    }
}

这是什么PrioritizedObservable<T>

public class PrioritizedObservable<T> 
   : IObservable<T>, IObserver<T>, IDisposable
{
    private IObservable<T> _source;
    private ISubject<T,T> _intermediary;
    private IList<Tuple<int, Subject<T>>> _router;

    public PrioritizedObservable(IObservable<T> source)
    {
        // Make sure we don't accidentally duplicate subscriptions
        // to the underlying source
        _source = source.Publish().RefCount();

        // A proxy from the source to our internal router
        _intermediary = Subject.Create(this, _source);
        _source.Subscribe(_intermediary);        

        // Holds per-priority subjects
        _router = new List<Tuple<int, Subject<T>>>();
    }

    public void Dispose()
    {
        _intermediary = null;
        foreach(var entry in _router)
        {
            entry.Item2.Dispose();
        }
        _router.Clear();
    }

    private ISubject<T,T> GetFirstListener()
    {
        // Fetch the first subject in our router
        // ordered by priority 
        return _router.OrderBy(tup => tup.Item1)
            .Select(tup => tup.Item2)
            .FirstOrDefault();
    }

    void IObserver<T>.OnNext(T value)
    {
        // pass along value to first in line
        var nextListener = GetFirstListener();
        if(nextListener != null)
            nextListener.OnNext(value);
    }

    void IObserver<T>.OnError(Exception error)
    {
        // pass along error to first in line
        var nextListener = GetFirstListener();
        if(nextListener != null)
            nextListener.OnError(error);
    }

    void IObserver<T>.OnCompleted()
    {
        var nextListener = GetFirstListener();
        if(nextListener != null)
            nextListener.OnCompleted();
    }

    public IDisposable Subscribe(IObserver<T> obs)
    {
        return PrioritySubscribe(1, obs);
    }

    public IDisposable PrioritySubscribe(int priority, IObserver<T> obs)
    {
        var sub = new Subject<T>();
        var subscriber = sub.Subscribe(obs);
        var entry = Tuple.Create(priority, sub);
        _router.Add(entry);
        _intermediary.Subscribe(sub);
        return Disposable.Create(() => 
        {
            subscriber.Dispose();
            _router.Remove(entry);
        });
    }
}

和一个测试工具:

void Main()
{
    var subject = new Subject<Message>();
    var sequence = subject.Publish().RefCount().Prioritize();

    Action<Message, int> subscriber = (m, priority) =>
    {
        if (!m.IsConsumed)
        {
            m.IsConsumed = true;
            Console.WriteLine(priority);
        }
    };

    var s3 = sequence.PrioritySubscribe(3, Observer.Create<Message>(m => subscriber(m, 3)));
    var s2 = sequence.PrioritySubscribe(2, Observer.Create<Message>(m => subscriber(m, 2)));
    var s1 = sequence.PrioritySubscribe(1, Observer.Create<Message>(m => subscriber(m, 1)));
    var s11 = sequence.PrioritySubscribe(1, Observer.Create<Message>(m => subscriber(m, 1)));

    subject.OnNext(new Message()); // output: 1

    s1.Dispose();
    subject.OnNext(new Message()); // output: 1
    s11.Dispose();

    subject.OnNext(new Message()); // output: 2
    s2.Dispose();
    subject.OnNext(new Message()); // output: 3

    sequence.Dispose();

}
于 2013-02-26T00:46:08.697 回答