4

给定类结构:

public class Foo
{
    public IEnumerable<Bar> GetBars()
    {
        for(int i = 0; i < 1000; i++)
        {
            Thread.Sleep(1000);
            yield return new Bar() { Name = i.ToString() };
        }
    }
}

public class Bar
{
    public string Name { get; set; }
}

我有列表,并且在方法IEnumerable<Foo>中检索下一个的时间很慢(上面用 模拟)。BarGetBars()Thread.Sleep(1000)

我想做以下事情:

myFoo.AsParallel().SelectMany(foo => foo.GetBars().Select(bar => bar.Name))

但是由于延迟,我想继续Bar为每个预加载下一个值Foo,然后按照它们变得可访问的顺序将IEnumable<Bar>每个值Foo合并到彼此中。

我一直在研究 Tpl Dataflow 异步 nuget 库(特别是TransformBlock在较小程度上ActionBlock),但找不到任何可以帮助我做我想做的事情的东西。

4

4 回答 4

2

问题是,无论是否平行,在获得第一个之前,您甚至无法开始获得第二个Bar对象。仅当您通过 LINQ 功能对每个对象进行长时间运行处理时,使用 PLINQ 才真正有帮助,如果延迟是由于底层IEnumerable.

一种选择是返回一系列Task对象,这样移动迭代器需要很少的时间:

public async Task<Bar> GenerateFoo()
{
    await Task.Delay(1000);
    return new Bar() { Name = i.ToString() };
}

public IEnumerable<Task<Bar>> GetBars()
{
    for(int i = 0; i < 1000; i++)
    {
        yield return GenerateFoo();
    }
}

使用该代码意味着仅移动迭代器只会开始生成Bar,而不是等到它完成。一旦你有了它,你可以为每个任务添加延续来处理每个 的处理Bar,或者你可以使用一种方法,例如Task.WaitAllTask.WhenAll等待它们全部完成。

于 2013-05-28T14:16:46.013 回答
1

我最终编写了一个新的实现,IEnumerable<T>它执行了预知:

public IEnumerator<T> GetEnumerator()
{
    TaskFactory<T> taskFactory = new TaskFactory<T>();
    Task<T> task = null;
    IEnumerator<T> enumerator = Source.GetEnumerator();

    T result = null;
    do
    {
        if (task != null)
        {
            result = task.Result;
            if (result == null)
                break;
        }

        task = taskFactory.StartNew(() =>
        {
            if (enumerator.MoveNext())
                return enumerator.Current;
            else
                return null;
        });
        if (result != null)
            yield return result;
    }
    while (task != null);
}

它只是在返回第一个结果之前请求前两个结果,然后总是在已经产生的结果之前保留一个结果请求。

于 2013-05-30T10:40:34.523 回答
1

我建议查看Reactive Extensions (Rx) Library。它基本上允许您在“推”类型集合(IObservable<T>)上使用 LINQ,而不是“拉”类型集合(IEnumerable<T>)。换句话说,您的代码可以在集合中的新项目可用时对其做出反应。

于 2013-05-28T14:13:37.800 回答
1

您可以编写如下的扩展方法,该方法将在可用时立即生成柱(在任何可枚举中)。

myFoo.Select(x=>x.GetBars()).Flatten().Select(bar => bar.Name)

public static class ParallelExtensions
{
    public static IEnumerable<T> Flatten<T>(this IEnumerable<IEnumerable<T>> enumOfEnums)
    {
        BlockingCollection<T> queue = new BlockingCollection<T>();

        Task.Factory.StartNew(() =>
        {
            Parallel.ForEach(enumOfEnums, e =>
            {
                foreach (var x in e)
                {
                    queue.Add(x);
                }
            });
            queue.CompleteAdding();
        });

        return queue.GetConsumingEnumerable();
    }
}
于 2013-05-28T14:36:22.547 回答