1

我对 Observables 有点陌生,所以我只是在寻找一个能让我朝着正确方向前进的例子(也许是教程?)。所以就在这里 - 我想创建异步Observable并从中Exception获取。这是我的例子:

    protected IObservable<Tuple<DataPart1, DataPart2>> LoadAllDataFunc(string FileName)
    {
        return Observable.Start<Tuple<DataPart1, DataPart2>>(() =>
        {
            ConfigReaderWriter readerWriter = new ConfigReaderWriter();
            try
            {
                readerWriter.UnpackFile(fileName, out DataPart1, out DataPart2);
                return Tuple.Create(DataPart1, DataPart2);
            }
            catch (Exception exp_gen)
            {
                Observable.Throw<Exception>(exp_gen);
                return null;
            }
        });
    }

问题是我认为我没有Exception正确投掷。例如 - 任何订阅者:

    internal IObservable<DataPart2> GetProject()
    {
        if (this.GlobalDataPart2 != null)
            return Observable.Return(GlobalDataPart2);

        IObservable<Project> receivedData = null;

        var loadAll = LoadAllDataFunc(this.GlobalFileName).Subscribe(
                data => { receivedData = Observable.Return(data.Item1); },
                (ex) => { Observable.Throw<Exception>(ex); }
            );

        return receivedData;
    }

不会收到Exception来自LoadAllDataFunc?即使发生异常,订阅者也会收到null.

那么 - 从 Observable 抛出异常的正确方法是什么?

4

2 回答 2

3

只是为了清楚地说明 Observable.Throw 的作用:它返回一个可观察的序列(具有指定的元素类型),其唯一作用是通过调用其 OnError 方法告诉所有观察者有关异常的信息:

var err = Observable.Throw<int>(new Exception("Oops!"));
err.Subscribe(_ => {}, ex => { Console.WriteLine(ex.Message); }, () => {});
err.Subscribe(_ => {}, ex => { Console.WriteLine(ex.Message); }, () => {});

上面的代码将打印 Oops!两次。

正如 Gideon 所提到的,像 Start 这样的操作符会将用户异常传播到 OnError 通道。但是,如果您使用 Observable.Create,请直接与观察者讨论错误情况:

var res = Observable.Create<int>(observer =>
{
    return scheduler.Schedule(() =>
    {
        var res = default(int);

        try
        {
            res = ComputationThatMayFail();
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
            return;
        }

        observer.OnNext(res);
        observer.OnCompleted();
    });
});

事实上,上面显示的代码与 Start 所做的非常接近。(一个区别与使用 AsyncSubject 缓存操作结果有关。)

于 2012-08-08T19:08:52.080 回答
0

使用Observable.Start时,最好的方法是不捕获异常。 Observable.ThrowObservable.Return简单地创建可观察的序列;IObservable正如您似乎期望的那样,他们没有从内部方法返回做任何特别的事情。这就是为什么您没有OnError从最上面的代码中获得调用的原因。对 Observable.Throw 的调用创建了一个 observable,对它不做任何事情,并从函数返回一个空值,然后将其发送到观察者的OnNext.

您的第二段代码似乎有相关问题。同样,对 Throw 的调用将无济于事。该订阅者的 OnNext 操作也可能不是您想要的。通常,您将从该函数返回 null ,因为返回LoadAllDataFunc时可能不会出现第一个结果GetProject(订阅通常不会阻塞。)我猜您想要做的是使用Select

return LoadAllDataFunc(this.GlobalFileName)
       .Select(data => data.Item1); //should this be Item2?
于 2012-08-08T13:14:50.367 回答