2

我对使用 Reactive Extensions 很陌生,所以这可能是一个新手问题,但我有以下情况:

我从数据库中获取 3 个 IEnumerable 列表(不同类型)并填充视图模型。但是,我想协调订阅以在所有列表完成加载后触发某些事情。反应式扩展是否可以做到这一点,或者我是否以错误的方式思考?代码看起来像这样:

GetCustomers()
    .ToObservable(Scheduler.Default)
    .Buffer(20).ObserveOn(SynchronizationContext.Current)
    .Subscribe(View.Model.AddRange);
GetCountries()
    .ToObservable(Scheduler.Default)
    .Buffer(20).ObserveOn(SynchronizationContext.Current)
    .Subscribe(View.Model.AddRange);
GetTransports()
    .ToObservable(Scheduler.Default)
    .Buffer(20).ObserveOn(SynchronizationContext.Current)
    .Subscribe(View.Model.AddRange);
4

2 回答 2

3

您可以尝试使用可观察连接。像这样的东西:

var plan =
    Observable.Start(() => GetCountries())
        .And(Observable.Start(() => GetCustomers()))
        .And(Observable.Start(() => GetTransports()))
        .Then((countries, customers, transports)
            => new { countries, customers, transports });

var query =
    Observable.When(new [] { plan });

query
    .Subscribe(cct =>
    {
        View.Model.AddRange(cct.countries);
        View.Model.AddRange(cct.customers);
        View.Model.AddRange(cct.transports);
    });

它并行运行,最后您将所有结果合二为一。

于 2012-10-30T00:59:46.427 回答
0

我不确定为什么要将已经同步的 Enumerable 更改为 Observable,但是在 Rx 习惯用法中,您可以:

Observable.Merge(Add(GetCustomers()), Add(GetCountries())..., Add(GetTransports()))
           .Subscribe(() => { }, Completed);

其中 Add 可能是:

    private IObservable<Unit> Add<T>(IObservable<T> o)
    {
        return o.Buffer(20)
                .ObserveOn(SynchronizationContext.Current)
                .Do(View.Model.AddRange)
                .Select(_ => Unit.Default);
    }
于 2012-10-29T16:02:09.257 回答