2

我必须调用属于外部组件的方法。该方法的签名如下所示:

IImportedData Import(string fileName, Action<int> progress);

这个操作可能需要很长时间才能执行,所以我需要异步调用它并向用户报告进度。我正在寻找不同的方法来调用它(Rx、TPL、ThreadPool)以找到富有表现力和清晰的东西,但是我正在努力想出一种在 Rx 中实现它的方法。

乍一看,使用 Rx 报告进度的想法似乎非常合适——它是与进度相关的传入整数流。唯一的问题是,当操作完成时,我需要检查IImportedData以向用户显示结果。OnCompleted 并非用于此目的,这使我走上了一条路径,即拥有一个公开两个 IObservable 流的类,然后是一个“启动”操作的方法。

private class Importer : IObservable<int>, IObservable<IImportedData>

感觉很笨重,我敢肯定有更好的方法,我不知道。

4

2 回答 2

3

前面想到两件事:

  1. Task<T>IProgress<T>似乎更适合这项任务
  2. 实施IObservable<T>通常是不受欢迎的。推荐使用静态方法创建复合实例Observable

如果您坚持使用 Rx,我建议您看看几年前我与 Rx 人员进行的讨论。Jeffrey van Gogh 最终推荐了一个Either<TLeft, TRight>响应,您将根据消息是“进度”事件还是“结果”事件自动路由回调。如果我再次处于那个位置,那肯定是我会采取的方向。

于 2013-08-21T00:01:27.793 回答
0

你可以尝试这样的事情:

Func<string, Action<int>, IObservable<IImportedData>> create =
    (fileName, progress) =>
        Observable.Create<IImportedData>(o =>
        {
            var foo = new Foo();
            var subject = new BehaviorSubject<int>(0);

            var d1 = subject.Subscribe(progress);

            var result = foo.Import(fileName, n => subject.OnNext(n));

            var d2 = subject
                .Where(x => x == 100)
                .Select(x => result)
                .Subscribe(o);

            return new CompositeDisposable(d1, d2);
        });

我还没有测试过它,但是这样的事情应该会给你一个相对干净的 Rx 方式来做你所追求的。您可能需要添加同步上下文。

于 2013-09-05T00:30:18.217 回答