1

我正在尝试一个“类型到搜索”的 Reactive-Extensions 示例,该示例从文本框(WPF 应用程序,如果它重要的话)中获取一个字符串,并进行可能冗长的服务器端搜索(在我的情况下使用 Thread.Sleep 进行模拟) 并在列表框中显示结果。

这个特定“模块”的主要特点是:

  • 异步搜索;搜索时 UI 未冻结
  • 节流;快速键入不会为每次击键生成服务器端搜索
  • 不同的搜索;在搜索“asd”并显示结果后快速键入 [退格],d(即删除最后一个字符并快速重新键入)不会重做服务器端搜索
  • 下降中介结果;如果我键入“asd”并短暂等待(导致启动服务器端搜索),然后在显示 asd 的结果之前完成搜索字符串,那些中间/特定结果将被删除

问题是,在重型方法(执行“服务器端搜索”的方法)出现一个异常之后,订阅被终止并且无法使用

到目前为止,我只是通过重新订阅 IObservable 对象找到了一种解决方法,但这感觉不对。我也尝试过 .Retry() 但尽管我可以重用订阅,但我的OnError处理程序不再被调用。

代码如下所示:

    private IObservable<object> _resultsFromTypeToSearch;

    private void SetupObserver()
    {
        var throttledUserInput =
            (from evt in Observable.FromEventPattern<TextChangedEventArgs>(TxtSearch, "TextChanged")
             select ((TextBox)evt.Sender).Text)
                .Throttle(TimeSpan.FromSeconds(0.6)) // this ensures that only after 0.6 seconds the user input is taken into consideration
                .DistinctUntilChanged();             // this ensures only changed input is taken into consideration

        var getDataFunc = new Func<string, object>(GetExpensiveData);
        var searchAsync = Observable.FromAsyncPattern<string, object>(getDataFunc.BeginInvoke, getDataFunc.EndInvoke);

        var z = from text in throttledUserInput
                from word in searchAsync(text).TakeUntil(throttledUserInput)
                select word;  // TakeUntil will drop an ongoing search if a new search is requested in the meantime
        _resultsFromTypeToSearch = z.ObserveOn(TxtSearch); // this ensures that we'll get notified on the UI thread
        _resultsFromTypeToSearch.Subscribe(PresentResults, OnError);
    }

    private void OnError(Exception obj)
    {
        ClearUI();
        MessageBox.Show("Error");

        _resultsFromTypeToSearch.Subscribe(PresentResults, OnError); // THIS IS MY WORKAROUND WHICH FEELS BAD
    }

    private void ClearUI()
    {
        IsBusy = false;
        Results.Clear();
    }

    private void PresentResults(object result)
    {
        ClearUI();
        Results.Add(result.ToString());
    }

    private object GetExpensiveData(string searchString)
    {
        IsBusy = true;
        if (DateTime.Now.Millisecond % 3 == 0) throw new ServerException();
        Thread.Sleep(2000);
        return "Data for " + searchString;
    }

有没有更好的方法来做到这一点?

4

2 回答 2

1

操作员在这里提供Catch帮助。

在您的代码中,我认为您可以这样做以防止订阅在 searchAsync 失败时结束:

var searchAsync = Observable
    .FromAsyncPattern<string, object>(getDataFunc.BeginInvoke, getDataFunc.EndInvoke)
    .Catch(e => Observable.Empty<object>()) // eat the error and return "no results" for this search.

你可以返回任何你想要的新的 observable。您可以记录错误并返回另一个搜索尝试。在这种情况下,我只返回一个空的 observable,它使整个订阅的行为就好像该搜索尝试从未发生过一样。

于 2013-04-17T20:15:52.563 回答
0

我误解了 Rx。一个序列一旦出现故障就不能被重用。简单的解决方案是重新创建订阅,即:再次调用 SetupObserver()

于 2013-05-30T18:14:15.890 回答