2

我有一个网络服务,我想在布尔值设置为 true 后立即查询更新,然后每 5 分钟重新检查一次。

这是我目前的观察:

        _queryDisposable = Observable
            .Interval(TimeSpan.FromMinutes(5))
            .ObserveOn(Scheduler.ThreadPool)
            .Where(i => IsProcessing)     // IsProcessing is the bool value
            .Subscribe(GetFeeds, OnError, OnComplete);

这个 observable 将IsProcessing每 5 分钟检查一次 bool 是真还是假,GetFeeds如果是真则调用。

我认为获得预期效果的唯一方法是创建IsProcessing一个属性支持的字段并使用 2 个 observable 进行处理,如下所示:

    private bool _isProcessing;

    public bool IsProcessing
    {
        get { return _isProcessing; }
        set
        {
            if (_isProcessing == value)
                return;

            _isProcessing = value;

            if (!value)
            {
                if(_queryDisposable != null)
                    _queryDisposable.Dispose();
                _queryDisposable = null;
            }
            else
            {
                Observable
                    .Range(0,1)
                    .ObserveOn(Scheduler.ThreadPool)
                    .Subscribe(GetFeedsSafe, OnError, OnComplete);

                _queryDisposable = Observable
                    .Interval(TimeSpan.FromMinutes(5))
                    .ObserveOn(Scheduler.ThreadPool)
                    .Subscribe(GetFeedsSafe, OnError, OnComplete);
            }
        }
    }

我认为这不是一个非常优雅的解决方案,所以我想知道是否有更好的方法来实现这种效果?

4

3 回答 3

3

我是否遗漏了什么,或者您不会在启动 Observable 之前立即自己调用订阅?

    Subscribe(GetFeeds, OnError, OnComplete);
    // Or perhaps, if you want it to be async,
    new TaskFactory().StartNew(()=>Subscribe(GetFeeds, OnError, OnComplete));

    _queryDisposable = Observable
        .Interval(TimeSpan.FromMinutes(5))
        .ObserveOn(Scheduler.ThreadPool)
        .Where(i => IsProcessing)     // IsProcessing is the bool value
        .Subscribe(GetFeeds, OnError, OnComplete);

编辑:纯 Rx,来自http://social.msdn.microsoft.com/Forums/sa/rx/thread/c4acaf34-3136-4206-a6f9-ef5afba74b2b

Observable.Interval(TimeSpan.FromMinutes(5))
        .StartWith(-1L)
        .ObserveOn(Scheduler.ThreadPool)
        .Where(i => IsProcessing)     // IsProcessing is the bool value
        .Subscribe(GetFeeds, OnError, OnComplete);
于 2012-06-25T14:40:38.563 回答
2

要立即开始排序并在给定时间内仍然产生一个值,您有两个选择:

  1. Observable.Interval(TimeSpan.FromMinutes(5)).StartWith(-1)
  2. Observable.Timer(TimeSpan.FromMinutes(0), TimeSpan.FromMinutes(5))

接下来我假设您想在 IsProcessing 值设置为 false 时取消计时器,您还想在它设置回 true 时重新启动序列。无论何时设置 IsProcessing 值,上面的两个答案都会运行 5 分钟的间隔。如果您设置 IsProcessing=true,序列将开始,如果一分钟后您设置 IsProcessing=false,然后 IsProcessing=true,您仍然需要等待 4 分钟才能再次查询。我想这不是你想要的?

所以我们希望这会在属性更改时触发。@SPFiredrake 在建议 INPC 并将事件转换为 Observable 序列方面做得很好。周围有很多小片段可以帮助您做到这一点。假设我们使用一个扩展方法 PropertyChanges,我们可以编写这个非常简单的查询。

var isProcessingValues = this.PropertyChanges(x=>x.IsProcessing);
isProcessingValues
    .Where(isProcessing=>isProcessing)
    .SelectMany(_=> 
        Observable.Timer(TimeSpan.Zero, TimeSpan.FromMinutes(5))
                  .TakeUntil(isProcessingValues.Where(isProcessing=>!isProcessing))
    )
    .Select(_=>GetFeeds())  //What does GetFeeds actually return?
    .SubscribeOn(Scheduler.NewThread)
    .Subscribe(feedData=>Console.WriteLine(feedData), 
        ex=>Console.WriteLine (ex),
        ()=>Console.WriteLine (completed));

慢慢地走过这个:

  1. 当 IsProcessing 属性发生变化时,将新值推送到序列中 (isProcessingValues)
  2. 当序列值为真时,启动一个计时器序列,该序列现在每 5 分钟产生一个值。
  3. 如果 isProcessingValues 序列产生 false 值,则计时器序列应停止
  4. 每次计时器滴答声(当 IsProcess 设置为 true 时,每 5 分钟一次),调用 GetFeeds
  5. 确保我们的序列在不同的线程上订阅(不阻塞)。如果这不是必需的,请删除。
  6. 从提要中获取数据并对其进行处理。
于 2012-07-06T12:25:26.097 回答
0

您是否考虑过让 IsProcessing 变量触发一个道具更改事件,然后您可以收听该事件?

var processing = Observable
    .FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
    .Where(tr => tr.EventArgs.Property == "IsProcessing" && ((Type)tr.Sender).IsProcessing);

_queryDisposable = Observable
    .Interval(TimeSpan.FromMinutes(5))
    .ObserveOn(Scheduler.ThreadPool)
    .And(processing) // Will only fire when both sequences have an available value.
    .Subscribe(GetFeeds, OnError, OnComplete);

这也确保了如果它正在处理,它只会触发一次,因为处理 observable 一次只提供一个值(对于每个 IsProcessing 更改为true),所以如果它仍在处理中,您不必担心它会再次触发5分钟后。

于 2012-06-25T14:47:19.667 回答