2

我是响应式编程的新手,遇到了一个问题......

我的代码如下所示:

    IsBusy = true;
    service.BeginGetClients(param, c => 
    {
        var r = service.EndGetClients(c);
        if(!CheckResult(r))
        {
            service.BeginGetCachedClients(param, c2 =>
                {
                    var r2 = service.EndGetCachedClients(c2);
                    if(CheckResult(r2))
                        ShowMessage("clients valid");
                    else
                        ShowMessage("clients not valid");

                    UpdateClients(r2);

                    service.BeginUpdateClients(r2, c3 =>
                        {
                            var b = service.EndUpdateClients(c3);
                            if(b)
                                ShowMessage("clients updated");
                            else
                                ShowMessage("clients not updated");
                            IsBusy = false;
                        }, null);

                }, null);
        }
        else
        {
            ShowMessage("error on get clients");
            IsBusy = false;
        }
    }, null);

怎么改成fluent Rx?我从这段代码开始:

    var invokeClients = Observable.FromAsyncPattern<Param, List<Client>>(service.BeginGetClients, service.EndGetClients);
    var invokeCachedClients = Observable.FromAsyncPattern<Param, List<Client>>(service.BeginGetCachedClients, service.EndGetCachedClients);
    invokeClients(param)
    .Subscribe(r =>
    {
        if(!CheckResult(r))
        {
            invokeCachedClients(param)
            .Subscribe(r2 =>
            {
                // TODO: next op
            });
        }
    });

有任何改进此代码的建议吗?也许另一种解决方案?我不喜欢这种级联代码...

谢谢!

4

3 回答 3

1
invokeClients(param)
    .Where(x => !CheckResult(x))
    .Select(invokeCachedClients)
    .Do(_ => IsBusy = true)
    .Merge()
    .Do(_ => IsBusy = false)
    .Subscribe(x => Console.WriteLine("Do something here"));

确保订阅,否则它将无法工作(就像没有 Foreach'ing LINQ 查询一样)

于 2011-12-21T19:44:52.373 回答
1

根据上面的答案

invokeClients(param)
  .Where(x => !CheckResult(x))
  .SelectMany(invokeCachedClients)
  .Subscribe(x => Console.WriteLine("Do something here"));

也可以写成

var query = from client in invokeClients(param)
  where !CheckResult(client)
  from cache in invokeCachedClients(client)
  select cache;

那么您可以将繁忙的标志包装在资源中

Func<IDisposable> busyResource = 
  () =>
  {
    IsBusy = true;
    return Disposable.Create(() => IsBusy = false);
  };

现在您可以将它与 Using 工厂一起使用。

Observable.Using(busyResource, _=>query)
  .Subscribe(x=>Console.Write("Do something here");

您倾向于使用 Using 方法而不是将 IsBusy 设置器放入 OnError 或 OnCompleted 的原因是,如果订阅也被释放,这将停止它。

我相信我们可能会做得比这更好,但我发现很难理解你实际上在做什么。老实说,我认为这实际上比 Rx 更适合 TPL 或数据工作流,即您实际上是在链接工作延续而不是对一系列事件做出反应。

于 2012-02-06T22:31:11.897 回答
0

有了这样的东西,你总是想从里到外。翻译这个最难的部分是异步调用之间的部分。如果您将一个权利的结果输入到下一个权利中,那将是一个直截了当的from x in async1() from y in async2(x) .... 我看到两个函数,我会将其分解为:

public IObserservable<string> UpdateCachedClients(object param)
{
    var getCachedClientsAsync = Observable.FromAsyncPattern<...>(service.BeginGetCachedClients, service.EndGetCachedClients);
    var updateClientsAsync = Observable.FromAsyncPattern<...>(service.BeginUpdateClients, service.EndUpdateClients);

    return Observable.Create<string>(obs =>
    {
        return (from r2 in getCachedClientsAsync(param)
                            .Do(v => 
                                { if (CheckResult(v))
                                      obs.OnNext("clients valid");
                                  else
                                      obs.OnNext("clients not valid");
                                  //huh? is this done twice
                                  UpdateClients(v); 
                                })
                from b in updateClientsAsync(r2)
                select (b ? "clients updated" : "clients not updated")
               ).Subscribe(obs);
    });
}

public IObservable<string> UpdateAllClients(object param)
{
    var getClientsAsync = Observable.FromAsyncPattern<...>(service.BeginGetClients, service.EndGetClients);
    return from r in getClientsAsync(param)
           select (CheckResult(r) ? 
                     Observable.Return("error on get clients") :
                     UpdateCachedClients(param));
}

我在第一个函数中使用了一个额外的层,Observable.Create因为它似乎是传递结果并继续下一个调用的最简单方法。一旦你有了这两个函数,你应该能够像这样调用它们:

IsBusy = true;
var disp = UpdateAllClients(param)
            .Subscribe(ShowMessage,
                       ex => IsBusy = false,
                       () => IsBusy = false);

请注意,我IsBusy在 OnError 和 OnCompleted 中都设置为 false,因为两者都是 IObservable 的终止消息。

TPL 似乎更适合像这样的异步方法,因为它们只产生一个值,但在具有 async/await 的语言的下一个版本之前,如果您使用 Tasks,您可能最终会得到与您的方法或我的方法相似的语法而不是 IObservables。

于 2011-12-22T00:36:02.800 回答