0

我对 Rx 相当陌生,并且无法找到解决问题的方法。我正在使用 Rx 通过客户端库开始下载。目前它看起来像:

private void DownloadStuff(string descriptor, Action<Stuff> stuffAction)
{
    this.stuffDownloader.GetStuffObservable(descriptor).Subscribe(x => stuffAction(x))
}

其中 stuffDownloader 是客户端库中定义的下载逻辑的包装器。但是我遇到了一个问题,就是调用DownloadStuff太多,导致下载很多,系统不堪重负。现在我想做的是

private void DownloadStuff(string descriptor, Action<Stuff> stuffAction)
{
    this.stuffDownloader.GetStuffObservable(descriptor)
        .SlowSubscribe(TimeSpan.FromMilliSeconds(50))
        .Subscribe(x => stuffAction(x))
}

其中,SlowSubscribe 是 Rx 操作的某种组合,仅在某个时间间隔订阅。

通常我只会将这些 DownloadStuff 调用放在一个队列中,并在一段时间内将它们拉出来,但我最近一直在尝试通过 Rx 做更多事情。我想到了三个解决方案:

  1. 此功能存在并且可以在订阅端完成。
  2. 这是可能的,但下载器的基础设施不正确,应该改变(即 stuffDownloader 需要表现不同)
  3. 这不应该用 Rx 来做,用另一种方式来做。

我想到#2 可以通过将描述符的 IObservable 传递给客户端库并以某种方式减慢描述符进入 Observable 的速度。

4

1 回答 1

-1

理论上,您可以使用 Rx 将您的请求视为事件。通过这种方式,您可以利用 Rx 的序列化特性来排队下载。

我认为您的网络层(或 stuffDownloader)会为您执行此操作,但是如果您想加入我的行列……这就是我想出的(耶哈!!)

1. 不要传递 Action,使用 Rx!!您基本上在这里失去了错误处理,并为奇怪的未处理异常做好了准备。

private void DownloadStuff(string descriptor, Action<Stuff> stuffAction)

变成

private IObservable<Stuff> DownloadStuff(string descriptor)

2. 现在我们只有一个方法调用另一个方法。似乎毫无意义。抛弃抽象。

3. 修复底层。对我来说, stuffDownloader 没有做好它的工作。更新接口以采用 IScheduler。现在你可以给它传递一个专门的 EventLoopScheduler 来强制工作的序列化

public IObservable<Stuff> GetStuffObservable(string descriptor, IScheduler scheduler)

4. 修正实施?当您想序列化您的请求时(hmmmm ....),您可以使调用同步。

private Stuff GetSync(string description)
{
    var request = (HttpWebRequest)WebRequest.Create("http://se300328:90/");
    var response =request.GetResponse();
    var stuff = MapToStuff(response);
    return stuff;
}

现在你只需在你的其他方法中调用它

public IObservable<Stuff> GetStuffObservable(string descriptor, ISchedulerLongRunning scheduler)
{
    return Observable.Create<Stuff>(o=>
        {
            try
            {
                var stuff = GetStuff(description);
                o.OnNext(stuff);
                o.OnCompleted();
            }
            catch(Exception ex)
            {
                o.OnError(ex);
            }
            return Disposable.Empty(); //If you want to be sync, you cant cancel!
        })
        .SubscribeOn(scheduler);
}

但是,完成所有这些之后,我确信这不是您真正想要的。我希望系统中的其他地方存在问题。

另一种选择是考虑使用 Merge 运算符及其最大并发功能?

于 2013-04-25T18:47:00.063 回答