2

我有一个来自 BlockCollection 的 Observable,我像队列一样使用它

IObservable<ProcessHoldTransactionData> GetObservable()
{     
    _queue.GetConsumingEnumerable().ToObservable(TaskPoolScheduler.Default);
}

并订阅他:

void StartSubscription()
{
    _subscription = =  GetObservable().Subscribe(
                data => OnNextSubscribe(data),
                ex => _logger.Error("Error"),
                () => _logger.Warn("Complete"));
}

现在我有另一个 Observable:

var timer = Observable.Interval(TimeSpan.FromSeconds(60));
_subscriptionTimer = timer.Subscribe(tick =>
{
    OnTimerNextSubscribe();
});

想当 OnTimerNextSubscribe开始停止订阅_subscription并在 OnTimerNextSubscribe 完成时更新它。

最好的做法是什么?
我应该处置_subscription并调用StartSubscription()

4

1 回答 1

1

基本上有两种选择:一种是处理然后重新启动,另一种是创建某种可观察的开/关信号,然后进行_subscription相应的过滤:

void StartSubscription(Observable<bool> onOffSignal)
{
    _subscription = =  GetObservable()
        .WithLatestFrom(onOffSignal, (s, b) => b ? Observable.Return(s) : Observable.Empty(s))
        .Merge()
        .Subscribe(
                data => OnNextSubscribe(data),
                ex => _logger.Error("Error"),
                () => _logger.Warn("Complete")
        );
}
于 2017-01-15T20:17:14.257 回答