7

我正在尝试使用 Reactive Extensions (Rx) 在任务完成时缓冲它们的枚举。有谁知道是否有一种干净的内置方法可以做到这一点?ToObservable扩展方法只会生成一个,这IObservable<Task<T>>不是我想要的,我想要一个IObservable<T>,然后我可以使用Buffer它。

人为的例子:

//Method designed to be awaitable
public static Task<int> makeInt()
{
     return Task.Run(() => 5);
}

//In practice, however, I don't want to await each individual task
//I want to await chunks of them at a time, which *should* be easy with Observable.Buffer 
public static void Main() 
{
    //Make a bunch of tasks
    IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt());

    //Is there a built in way to turn this into an Observable that I can then buffer?
    IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //????

    buffered.Subscribe(ints => {
        Console.WriteLine(ints.Count()); //Should be 15
    });
}
4

1 回答 1

8

您可以使用可以使用另一个重载Task转换为 observable的事实。ToObservable()

当您拥有(单项)可观察对象的集合时,您可以使用Merge().

因此,您的代码可能如下所示:

futureInts.Select(t => t.ToObservable())
          .Merge()
          .Buffer(15)
          .Subscribe(ints => Console.WriteLine(ints.Count));
于 2013-06-11T18:34:50.150 回答