5

我有一个类型序列IObservable<T>和一个映射T, CancellationTokenTask<U>. 摆脱它们的最干净的方法是IObservable<U>什么?

我需要以下语义:

  • 每个任务在前一个项目的任务完成后开始
  • 如果任务已被取消或出错,则跳过
  • 原始序列的顺序被严格保留

这是我看到的签名:

public static IObservable<U> Select<T, U> (
    this IObservable<T> source,
    Func<T, CancellationToken, Task<U>> selector
);

我还没有写任何代码,但除非有人打败我,否则我会写。
无论如何,我不熟悉像这样的运算符Window,所以我的解决方案可能不太优雅。

我需要 C# 4 中的解决方案,但为了比较,也欢迎 C# 5 答案。


如果您好奇,以下是我的真实场景,或多或少:

Dropbox.GetImagesRecursively ()
    .ObserveOn (SynchronizationContext.Current)
    .Select (DownloadImage)
    .Subscribe (AddImageToFilePicker);
4

1 回答 1

3

到目前为止,这似乎对我有用:

public static IObservable<U> Select<T, U> (
    this IObservable<T> source,
    Func<T, CancellationToken, Task<U>> selector)
{
    return source
        .Select (item => 
            Observable.Defer (() => 
                Observable.StartAsync (ct => selector (item, ct))
                    .Catch (Observable.Empty<U> ())
            ))
        .Concat ();
}

我们将一个基于延迟任务的异常吞咽可观察对象映射到每个项目,然后将它们连接起来。


我的思考过程是这样的。

我注意到其中一个SelectMany重载几乎完全符合我的要求,甚至具有完全相同的签名。但它并没有满足我的需求:

  • 它会在原始项目出现时创建任务,而我需要等待每个任务完成
  • 它没有提供跳过取消和错误任务的选项

我查看了这个重载的实现,并注意到它用于FromAsync处理任务创建和取消:

public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult> (IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
{
    return SelectMany_<TSource, TTaskResult, TResult> (
        source,
        x => FromAsync (ct => taskSelector (x, ct)),
        resultSelector
    );
}

我转过头来FromAsync看看它是如何实现的,并惊喜地发现它也是可组合的:

public virtual IObservable<TResult> FromAsync<TResult> (Func<CancellationToken, Task<TResult>> functionAsync)
{
    return Defer (() => StartAsync (functionAsync));
}

我重用了Deferand StartAsync,同时也增加Catch了吞咽错误。并确保任务相互等待并以原始顺序启动DeferConcat

于 2013-07-12T16:02:38.373 回答