7

给定一个可观察的源,通过轮询低级设备的(a的变化)状态生成......

// observable source metacode:
IObservable<DeviceState> source = Observable.Interval(TimeSpan.FromSeconds(0.5))
    .Select(tick => new DeviceState(_device.ReadValue()))
    .DistinctUntilChanged();

...以及更新 UI 的消费者...

// UI metacode:
service.GetObservableDeviceStates()
    .Subscribe(state => viewModel.CurrentState = state.ToString());

...我需要在源“不活动”x 秒后执行自定义操作,而不会中断对源的订阅。像这样的东西:

// UI metacode:
service.GetObservableDeviceStates()
    .DoOnTimeout(TimeSpan.FromSeconds(x), () => viewModel.CurrentState = "Idle")
    .Subscribe(state => viewModel.CurrentState = state.ToString());

最佳实践是什么?想到的可能解决方案是(我是 Rx 菜鸟):

  1. 缓冲区(即使它不那么可读)
  2. 解决这个超时重载
  3. 当没有任何变化时返回一些特殊的“服务端”(而不是使用 DistinctUntilChanged)并在 UI 代码上处理它:

    service.GetObservableDeviceStates() .Subscribe(state => viewModel.CurrentState = state.Special ? "Idle" : state.ToString());

编辑:如答案中所述,解决方案是:

        service.GetObservableDeviceStates()
            .Do(onNext)
            .Throttle(TimeSpan.FromSeconds(x))
            .Subscribe(onTimeout);

EDIT2(警告)

如果 onNext 和 onTimeout 更新 UI 组件,为了避免 CrossThreadExceptions需要两个ObserveOn(uiSynchronizationContext),因为 Throttle 在另一个线程上工作!

        service.GetObservableDeviceStates()
            .ObserveOn(uiSynchronizationContext)
            .Do(onNext)
            .Throttle(TimeSpan.FromSeconds(x))
            .ObserveOn(uiSynchronizationContext)
            .Subscribe(onTimeout);
4

2 回答 2

7

超时或多或少适用于表示单个异步操作的可观察对象 - 例如,返回默认值或OnError所述可观察对象在一定时间内没有通知您。

您正在寻找的运算符是Throttle,即使它一开始可能看起来不像。Throttle(p)给你一个流,当源流没有产生一个值时产生一个值 period p

与现有代码并行,您可以使用source.Throttle(period).Do(...side effect).

于 2012-10-08T18:16:07.870 回答
5

我个人会为此避免使用 Do 方法。它确实使此示例中的代码相当简单,但我发现一旦使用“Do”潜入代码库,您很快就会有意大利面条。

您还可以考虑使用 Amb、Timer、TakeUntil、Throttle 等的组合来获得您正在寻找的结果并仍然维护 Monad*。或者简单来说,我假设您理想地希望获得一系列状态值,并且不需要在您的代码中放置一个计时器(即,将其卸载到服务中)。

public IObservable<DeviceStatus> GetObservableDeviceStates(TimeSpan silencePeriod)
{
    return Observable.Create<DeviceStatus>(
    o=>
    {
        var idle = Observable.Timer(silencePeriod).Select(_=>new DeviceStatus("Idle"));

        var polledStatus = Observable.Interval(TimeSpan.FromSeconds(0.5))
                        .Select(tick => new DeviceStatus(_device.ReadValue()))
                        .DistinctUntilChanged()
                        .Publish();

        var subscription = (from status in polledStatus
                            from cont in Observable.Return(status).Concat(idle.TakeUntil(polledStatus))
                            select cont)
                     .Subscribe(o);

        return new CompositeDisposable(subscription, polledStatus.Connect());
    });
}

一旦发生指定的更改静默期,此代码现在使服务返回空闲状态值。

这意味着您的 UI 元代码保持简单,与 DeviceStatus 相关的逻辑保持在其所属的位置

// UI metacode:
service.GetObservableDeviceStates(TimeSpan.FromSeconds(2))
    .Subscribe(state => viewModel.CurrentState = state.ToString());
于 2012-10-09T10:31:28.123 回答