0

只是想了解一下 Rx

我正在使用 Rx 每 2 秒轮询一次网站

var results = new List<MyDTO>();
var cx = new WebserviceAPI( ... );
var callback = cx.GetDataAsync().Subscribe(rs => { results.AddRange(rs); });
var poller = Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe( _ => { cx.StartGetDataAsync(); });

(Web 服务 API 公开了一个 getItemsAsync/getItemsCompleted 事件处理程序类型机制,我从中创建了一个 observable)。

当网站返回时,我将响应的“业务部分”解压缩到 DTO 的 IEnumerable 中

public IObservable<IEnumerable<MyDTO>> GetDataAsync()
{
    var o = Observable.FromEventPattern<getItemsCompletedEventHandler,getItemsCompletedEventArgs>(
        h => _webService.getItemsCompleted += h,
        h => _webService.getItemsCompleted -= h);

    return o.Select(c=> from itm in c.EventArgs.Result.ItemList
                        select new MyDTO()
                        {
                           ...
                        });
}

我的理由是,鉴于所有数据都在字符串中,因此将其打包到 IEnumerable 中是有意义的……但现在我不确定这是否正确!

如果网站响应时间超过 2 秒,我发现 MSTest 正在崩溃。调试时,产生的错误是

“异步处理过程中出现错误。多个异步同时操作需要唯一的状态对象才能处于未完成状态”

除了内部异常

“已添加项目。字典中的键:'System.Object' 正在添加的键:'System.Object'”

我假设问题是重入问题之一,因为下一个调用是在上一个调用完成填充数据之前开始并返回数据。

所以我不确定是否

  1. 我把这件事放在一起很正确
  2. 我应该以某种方式限制连接以避免重新进入。
  3. 我应该使用不同的中间数据结构(或机制)而不是 IEnumerable

我会很感激一些指导。

编辑 1:所以我更改了网络调用以包含一个唯一的状态对象

public void StartGetDataAsync()
{
    ...
    //  was: _webService.getItemsAsync(request);
    _webService.getItemsAsync(request, Guid.NewGuid());
}

并使它工作。但我仍然不确定这是否是正确的方法

编辑 2 - Web 服务信号:我正在使用 webServiceApi 类包装的肥皂网络服务。创建的references.cs包含以下方法

public void getItemsAsync(GetItemsReq request, object userState) 
{
    if ((this.getItemsOperationCompleted == null)) 
    {
        this.getItemsOperationCompleted = new System.Threading.SendOrPostCallback(this.OngetItemsOperationCompleted);
    }
    this.InvokeAsync("getItems", new object[] {
                    request}, this.getItemsOperationCompleted, userState);
}

private System.Threading.SendOrPostCallback getItemsOperationCompleted;

public event getItemsCompletedEventHandler getItemsCompleted;

public delegate void getItemsCompletedEventHandler(object sender, getItemsCompletedEventArgs e);

public partial class getItemsCompletedEventArgs : System.ComponentModel.AsyncCompletedEventArgs 
{
    ...
}

private void OngetItemsOperationCompleted(object arg) 
{
    if ((this.getItemsCompleted != null)) 
    {
        System.Web.Services.Protocols.InvokeCompletedEventArgs invokeArgs = ((System.Web.Services.Protocols.InvokeCompletedEventArgs)(arg));
        this.getItemsCompleted(this, new getItemsCompletedEventArgs(invokeArgs.Results, invokeArgs.Error, invokeArgs.Cancelled, invokeArgs.UserState));
    }
 }

可能给你太多(或错过了什么)!

谢谢

4

2 回答 2

1

我想我为你找到了一个不错的起点。

基本上,我认为您需要抽象出 Web 服务的复杂性并创建一个干净的函数来获得结果。

尝试这样的事情:

Func<GetItemsReq, IObservable<getItemsCompletedEventArgs>> fetch =
    rq =>
        Observable.Create<getItemsCompletedEventArgs>(o =>
        {
            var cx = new WebserviceAPI(/* ... */);
            var state = new object();
            var res =
                Observable
                    .FromEventPattern<
                        getItemsCompletedEventHandler,
                        getItemsCompletedEventArgs>(
                        h => cx.getItemsCompleted += h,
                        h => cx.getItemsCompleted -= h)
                    .Where(x => x.EventArgs.UserState == state)
                    .Take(1)
                    .Select(x => x.EventArgs);
            var subscription = res.Subscribe(o);
            cx.getItemsAsync(rq, state);
            return subscription;
        });

就我个人而言,我会更进一步,定义一个返回类型,比如GetItemsReq,它不包括用户状态对象,但与getItemsCompletedEventArgs.

然后,您应该能够使用Observable.Interval创建您需要的轮询。

如果您的 Web 服务实现IDisposable了,那么您应该Observable.Using在上述函数中添加一个调用,以便在 Web 服务完成后正确处理它。

让我知道这是否有帮助。

于 2012-05-24T04:33:20.200 回答
1

Enigmativity 有一个很好的解决方案。

          var state = new object();

          cx.getItemsAsync(rq, state);

这些陈述是解决错误的关键。

每次调用 Async 方法时,都需要将一个唯一的对象传递给它(当然,除非一次只运行一个 Async 方法,这是极不可能的)。我为此苦苦思索了好几个小时,直到我发现你可以传递一个对象参数。如果没有该参数,它会将多个方法视为同一个调用,并为您提供“已添加项目。字典中的键:'System.Object' 正在添加的键:'System.Object'”消息。

于 2013-10-29T20:55:03.453 回答