0

我有以下代码:

var observable = ... subscribe to event here ...

var windows = observable.Window(TimeSpan.FromSeconds(240));

aggregatedWindows = windows.SelectMany(
    window => window.Aggregate(new Context(), AggregateContext));

subscription = aggregatedWindows.Subscribe(OnWindow);

... later

subscription.Dispose();

想象一个场景,当我正在处理一个窗口并且有人要求我的应用程序应该关闭时。我将处理此订阅,这将停止正在处理的事件,但是我也将丢失最后一个信息窗口。

我不确定解决这个问题的最佳方法是什么......

我可以将本地状态与最后看到的窗口一起存储,因为它通过聚合函数传递(但这似乎是错误的)......

任何帮助将不胜感激!

4

2 回答 2

0

您可以对窗口进行操作,而不是保留聚合的订阅 - 这是您主要希望保持连接以使最后一个窗口到达的内容,并使用超时断开连接,以防分区时间过长。

这里使用了一个单独的类,因为 usingCreate使它成为 Auto Detach - 在进行 dispose 调用后立即断开观察者的连接。所以从根本上说,Dispose 的含义就是这里发生了变化。

    public static IObservable<T> DeferDisconnection<T>(this IObservable<T> observable, TimeSpan timeout)
    {
        return new ClosingObservable<T>(observable, timeout);
    }


    public class ClosingObservable<T> : IObservable<T>
    {

        private readonly IConnectableObservable<T> Source;
        private readonly IDisposable Subscription;
        private readonly TimeSpan Timeout;

        public ClosingObservable(IObservable<T> observable, TimeSpan timeout)
        {
            Timeout = timeout;
            Source = observable.Publish();
            Subscription = Source.Connect();
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            Source.Subscribe(observer);

            return Disposable.Create(() => Source.Select(_ => new Unit())
                                                 .Amb(Observable.Timer(Timeout).Select(_ => new Unit()))
                                                 .Subscribe(_ => Subscription.Dispose())
                                                 );
        }
    }

测试:

            var disposable =
            Observable.Interval(TimeSpan.FromSeconds(2))
                      .Do(Console.WriteLine)
                      .DeferDisconnection(TimeSpan.FromSeconds(5))
                      .Subscribe();

            Console.ReadLine();

            disposable.Dispose();

            Console.ReadLine();
于 2012-06-26T22:11:58.720 回答
-1

这是有效的,正如最后显示的部分窗口所证实的那样。

class Program
{
    public class Context
    {
        public int count;
    }

    static Context AggregateContext(Context c, long i)
    {
        c.count++;
        return c;
    }

    static void OnWindow(Context c) { Console.WriteLine(c.count); }

    static void Main(string[] args)
    {
        var canceled = new Subject<bool>();

        var observable = Observable.Interval(TimeSpan.FromSeconds(.1)).TakeUntil(canceled);

        var windows = observable.Window(TimeSpan.FromSeconds(3));

        var aggregatedWindows = windows.SelectMany(
            window => window.Aggregate(new Context(), AggregateContext));

        var subscription = aggregatedWindows.Subscribe(OnWindow);

        Thread.Sleep(TimeSpan.FromSeconds(10));

        canceled.OnNext(true);
        subscription.Dispose();

        Console.WriteLine( @"Output should have been something like 30,30,30,30,10" );
        Console.ReadLine();
    }
} 
于 2012-06-26T22:40:26.563 回答