3

我有一组具有以下接口的类:

public interface RoutedEventReceiver<T>
{
    IDisposable Apply(IObservable<T> stream);
    bool ShouldForwardEvent(T anEvent);
}

我想做的是维护这些类的堆栈,每个事件都通过ShouldForwardEvent(T)谓词过滤,并将结果IObservable<T>传递给下一个接收者。我还希望能够在程序运行时推送和弹出新的接收器(有时我可能想从堆栈移动到其他集合,但现在堆栈就足够了)。

我目前所拥有的确实有效,但我觉得它不是很“Rx”。我确信必须有一种方法可以在没有所有这些命令式逻辑的情况下做我想做的事:

private void Refresh()
{
    // _subscriptions is a list of previous subscriptions
    foreach (var subscription in _subscriptions)
        subscription.Dispose();
    _subscriptions.Clear();

    // _stream is my stream of incoming events
    if (_stream != null)
    {
        var stream = _stream;

        foreach (var eventReceiver in _eventReceivers)
        {
            // add the subscription so it can be disposed next Refresh()
            _subscriptions.Add(eventReceiver.Apply(stream));

            // filter the stream for the next event receiver
            stream = stream.Where(eventReceiver.ShouldForwardEvent);
        }
    }
}

Push每当我或Pop在堆栈上时,都会调用上述方法。

是否有一种更简洁、更实用的方式来表达上述意图?我尝试过.Publish(),但收效甚微——也许我还不够了解。

4

3 回答 3

0

下面的类(名为 CORStack for Chain Of Responsibility* Stack)试图做你想做的事。在内部,它向流中添加了一个 ShouldHandle bool 并使用它来确定是否进行处理。它公开了标准Push、、PopPeek方法。

public sealed class CORStack<T>
{
    Stack<StackFrame> _handlers;

    public CORStack(IObservable<T> source)
    {
        _handlers = new Stack<StackFrame>();
        _handlers.Push(new StackFrame(
            source.Select(t => new ShouldHandleWrapper(t, true)),
            new Handler<T>(new Action<T>(t => { }), true)));
    }

    public void Push(Handler<T> handler)
    {
        _handlers.Push(new StackFrame(_handlers.Peek().Observable, handler));
    }

    public Handler<T> Peek()
    {
        return _handlers.Peek().Handler;
    }

    public Handler<T> Pop()
    {
        var frame = _handlers.Pop();
        frame.Dispose();
        return frame.Handler;
    }

    class StackFrame : IDisposable
    {
        IDisposable _unsub;

        public IObservable<ShouldHandleWrapper> Observable { get; private set; }
        public Handler<T> Handler { get; private set; }

        public StackFrame(IObservable<ShouldHandleWrapper> topOfStack, Handler<T> handler)
        {
            _unsub = topOfStack.Subscribe(shouldHandle =>
                {
                    if (shouldHandle.ShouldHandle)
                        handler.Action.Invoke(shouldHandle.Value);
                });
            Observable = topOfStack.Select(shouldHandle =>
                new ShouldHandleWrapper(shouldHandle.Value, shouldHandle.ShouldHandle && handler.Forward));
            Handler = handler;
        }

        public void Dispose()
        {
            _unsub.Dispose();
        }
    }

    class ShouldHandleWrapper
    {
        public readonly T Value;
        public readonly bool ShouldHandle;

        public ShouldHandleWrapper(T value, bool shouldHandle)
        {
            Value = value;
            ShouldHandle = shouldHandle;
        }
    }
}

public class Handler<T>
{
    public Action<T> Action { get; set; }
    public bool Forward { get; set; }

    public Handler(Action<T> action, bool forward)
    {
        Action = action;
        Forward = forward;
    }
}

*我意识到这不是一个责任链,但想不出更好的名字 atm。

于 2013-06-24T01:38:54.423 回答
0

我已经设法使这种Publish方法奏效,但除了摆脱保留以下列表的需要之外,它并没有给我带来太多好处IDisposables

private void Refresh()
{
    _published.DisposeIfNotNull();

    if (_stream != null)
    {
        var connectable = _stream.Publish();
        _published = connectable.Connect();
        var stream = connectable.AsObservable();

        foreach (var eventReceiver in _eventReceivers)
        {
            eventReceiver.Apply(stream);
            stream = stream.Where(eventReceiver.ShouldForwardEvent);
        }
    }
}
于 2013-06-23T17:05:17.597 回答
0

这是我实际使用的情况Subjects。为每个处理程序创建一个主题,然后订阅流并根据需要循环通过处理程序传递事件。这避免了不断地取消订阅/重新订阅流(以及Refresh方法),这并不总是合适的。我们lock用来防止在新值通过流的同时添加或删除新接收器。如果您可以保证不会发生这种情况,那么您可以删除这些lock语句。

public class YourClass<T> : IDisposable
{
    private readonly Stack<Tuple<Subject<T>, RoutedEventReceiver<T>, IDisposable> _handlers;
    private readonly IObservable<T> _stream;
    private readonly IDisposable _streamSubscription;

    public YourClass(IObservable<T> stream)
    {
        _handlers = new Stack<Tuple<Subject<T>, RoutedEventReceiver<T>, IDisposable>();
        _stream = stream;
        _streamSubscription = stream.Subscribe(OnNext, OnError, OnCompleted);
    }

    public void Dispose()
    {
        _streamSubscription.Dispose();

        lock (_handlers)
        {
            foreach (var h in _handlers)
            {
                h.Item3.Dispose();
                h.Item1.Dispose();
            }
            _handlers.Clear();
        }
    }

    private void OnNext(T value)
    {
        lock (_handlers)
        {
            for (var h in _handlers)
            {
                h.Item1.OnNext(value);
                if (!h.Item2.ShouldForwardEvent(value)) break;
            }
        }
    }

    private void OnError(Exception e)
    {
        lock (_handlers)
        {
            for (var h in _handlers) { h.Item1.OnError(e); }
        }
    }

    private void OnCompleted()
    {
        lock (_handlers)
        {
            for (var h in _handlers) { h.Item1.OnCompleted(); }
        }
    }

    public void Push(RoutedEventReceiver<T> handler)
    {
        lock (_handlers)
        {
            var subject = new Subject<T>;
            _handlers.Push(Tuple.Create(subject, handler, handler.Apply(subject)));
        }
    }

    public RoutedEventReceiver<T> Pop()
    {
        lock (_handlers)
        {
            var handler = _handlers.Pop();
            handler.Item3.Dispose();
            handler.Item1.Dispose();
            return handler.Item2;
        }
    }
}
于 2013-06-24T13:58:06.193 回答