首先,典型的搜索 Rx 如下所示:
var searchResults = Observable.FromEventPattern<SomeEventArgs>(searchBar, "TextChanged")
.Select(pattern => pattern.EventArgs.SearchText)
.Select(text => Observable.Start(() => Search(text)))
Select 为您提供结果流流,Switch 将返回最近创建的流。我添加了 DistinctUntilChanged 以防止提交重复的查询。
您所描述的一种策略是为油门提供油门持续时间选择器,该选择器将在油门持续时间之后或单击按钮时发出。我拼凑了一个示例 ViewModel,它避免使用除 Rx 2.1 之外的任何库来展示如何做到这一点。这是整个 ViewModel - 我将把 View 和 Repository 留给你想象,但应该清楚它们的作用。
最后的警告 - 我试图让这个示例保持简短,并省略可能会影响理解的不必要的细节,所以这还没有准备好生产:
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.CompilerServices;
using System.Windows.Input;
using StackOverflow.Rx.Annotations;
using StackOverflow.Rx.Model;
namespace StackOverflow.Rx.ProductSearch
public class ClassicProductSearchViewModel : INotifyPropertyChanged
private string _query;
private IProductRepository _productRepository;
private IList<Product> _productSearchResults;
public ClassicProductSearchViewModel(IProductRepository productRepository)
_productRepository = productRepository;
// Wire up a Button from the view to this command with a binding like
// <Button Content="Search" Command="{Binding ImmediateSearch}"/>
ImmediateSearch = new ReactiveCommand();
// Wire up the Query text from the view with
// a binding like <TextBox MinWidth="100" Text="{Binding Query, UpdateSourceTrigger=PropertyChanged}"/>
var newQueryText = Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(
h => PropertyChanged += h,
h => PropertyChanged -= h)
.Where(@event => @event.EventArgs.PropertyName == "Query")
.Select(_ => Query);
// This duration selector will emit EITHER after the delay OR when the command executes
var throttleDurationSelector = Observable.Return(Unit.Default)
.Merge(ImmediateSearch.Select(x => Unit.Default));
.Throttle(x => throttleDurationSelector)
/* Your search query here */
text =>
() => _productRepository.FindProducts(new ProductNameStartsWithSpecification(text))))
.Subscribe(products => ProductSearchResults = new List<Product>(products));
public IList<Product> ProductSearchResults
get { return _productSearchResults; }
if (Equals(value, _productSearchResults)) return;
_productSearchResults = value;
public ReactiveCommand ImmediateSearch { get; set; }
public string Query
get { return _query; }
if (value == _query) return;
_query = value;
public event PropertyChangedEventHandler PropertyChanged;
protected virtual void OnPropertyChanged([CallerMemberName] string propertyName = null)
PropertyChangedEventHandler handler = PropertyChanged;
if (handler != null) handler(this, new PropertyChangedEventArgs(propertyName));
// A command that is also an IObservable!
public class ReactiveCommand : ICommand, IObservable<object>
private bool _canExecute = true;
private readonly Subject<object> _execute = new Subject<object>();
public ReactiveCommand(IObservable<bool> canExecute = null)
if (canExecute != null)
canExecute.Subscribe(x => _canExecute = x);
public bool CanExecute(object parameter)
return _canExecute;
public void Execute(object parameter)
public event EventHandler CanExecuteChanged;
public IDisposable Subscribe(IObserver<object> observer)
return _execute.Subscribe(observer);
有像 Rxx 和 ReactiveUI 这样的库可以使这段代码更简单——我没有在这里使用它们,所以有最小的“魔法”发生!
本例中的我的 ReactiveCommand 是 ReactiveUI 中包含的一个简单实现。这看起来同时是一个命令和一个 IObservable。每当它被执行时,它都会流式传输命令参数。
这是作者博客中使用 ReactiveUI 的示例:http: //blog.paulbetts.org/index.php/2010/06/22/reactivexaml-series-reactivecommand/