这是一个非常有趣的问题......
所以,首先:我不知道有任何内在的 Rx 运算符可以实现类似于您在此Priority
扩展中想要的“路由”效果。
也就是说,我今天午餐时在 LINQPad 上玩耍,并想出了一个(非常)hacky 的概念证明,它似乎有效:
一、你的消息类
public class Message
{
public string Value { get; set; }
public bool IsConsumed { get; set; }
}
接下来,扩展方法wrapper-class:
public static class Ext
{
public static PrioritizedObservable<T> Prioritize<T>(this IObservable<T> source)
{
return new PrioritizedObservable<T>(source);
}
}
这是什么PrioritizedObservable<T>
?
public class PrioritizedObservable<T>
: IObservable<T>, IObserver<T>, IDisposable
{
private IObservable<T> _source;
private ISubject<T,T> _intermediary;
private IList<Tuple<int, Subject<T>>> _router;
public PrioritizedObservable(IObservable<T> source)
{
// Make sure we don't accidentally duplicate subscriptions
// to the underlying source
_source = source.Publish().RefCount();
// A proxy from the source to our internal router
_intermediary = Subject.Create(this, _source);
_source.Subscribe(_intermediary);
// Holds per-priority subjects
_router = new List<Tuple<int, Subject<T>>>();
}
public void Dispose()
{
_intermediary = null;
foreach(var entry in _router)
{
entry.Item2.Dispose();
}
_router.Clear();
}
private ISubject<T,T> GetFirstListener()
{
// Fetch the first subject in our router
// ordered by priority
return _router.OrderBy(tup => tup.Item1)
.Select(tup => tup.Item2)
.FirstOrDefault();
}
void IObserver<T>.OnNext(T value)
{
// pass along value to first in line
var nextListener = GetFirstListener();
if(nextListener != null)
nextListener.OnNext(value);
}
void IObserver<T>.OnError(Exception error)
{
// pass along error to first in line
var nextListener = GetFirstListener();
if(nextListener != null)
nextListener.OnError(error);
}
void IObserver<T>.OnCompleted()
{
var nextListener = GetFirstListener();
if(nextListener != null)
nextListener.OnCompleted();
}
public IDisposable Subscribe(IObserver<T> obs)
{
return PrioritySubscribe(1, obs);
}
public IDisposable PrioritySubscribe(int priority, IObserver<T> obs)
{
var sub = new Subject<T>();
var subscriber = sub.Subscribe(obs);
var entry = Tuple.Create(priority, sub);
_router.Add(entry);
_intermediary.Subscribe(sub);
return Disposable.Create(() =>
{
subscriber.Dispose();
_router.Remove(entry);
});
}
}
和一个测试工具:
void Main()
{
var subject = new Subject<Message>();
var sequence = subject.Publish().RefCount().Prioritize();
Action<Message, int> subscriber = (m, priority) =>
{
if (!m.IsConsumed)
{
m.IsConsumed = true;
Console.WriteLine(priority);
}
};
var s3 = sequence.PrioritySubscribe(3, Observer.Create<Message>(m => subscriber(m, 3)));
var s2 = sequence.PrioritySubscribe(2, Observer.Create<Message>(m => subscriber(m, 2)));
var s1 = sequence.PrioritySubscribe(1, Observer.Create<Message>(m => subscriber(m, 1)));
var s11 = sequence.PrioritySubscribe(1, Observer.Create<Message>(m => subscriber(m, 1)));
subject.OnNext(new Message()); // output: 1
s1.Dispose();
subject.OnNext(new Message()); // output: 1
s11.Dispose();
subject.OnNext(new Message()); // output: 2
s2.Dispose();
subject.OnNext(new Message()); // output: 3
sequence.Dispose();
}