9

我有多个枚举器来枚举平面文件。我最初在 Parallel Invoke 中有每个枚举器,每个 Action 都添加到 aBlockingCollection<Entity>中,并且集合返回 ConsumingEnumerable();

public interface IFlatFileQuery
{
    IEnumerable<Entity> Run();
}

public class FlatFile1 : IFlatFileQuery
{
    public IEnumerable<Entity> Run()
    {
        // loop over a flat file and yield each result
        yield return Entity;
    }
} 

public class Main
{
    public IEnumerable<Entity> DoLongTask(ICollection<IFlatFileQuery> _flatFileQueries)
    {
            // do some other stuff that needs to be returned first:
            yield return Entity;

            // then enumerate and return the flat file data
        foreach (var entity in GetData(_flatFileQueries))
        {
            yield return entity;
        }
    }

    private IEnumerable<Entity> GetData(_flatFileQueries)
    {
        var buffer = new BlockingCollection<Entity>(100);

        var actions = _flatFileQueries.Select(fundFileQuery => (Action)(() =>
        {
            foreach (var entity in fundFileQuery.Run())
            {
                buffer.TryAdd(entity, Timeout.Infinite);
            }
        })).ToArray();

        Task.Factory.StartNew(() =>
        {
            Parallel.Invoke(actions);

            buffer.CompleteAdding();
        });

        return buffer.GetConsumingEnumerable();
    }
}

然而,经过一些测试后发现,下面的代码更改速度提高了大约 20-25%。

private IEnumerable<Entity> GetData(_flatFileQueries)
{
    return _flatFileQueries.AsParallel().SelectMany(ffq => ffq.Run());
}

代码更改的问题在于,它要等到所有平面文件查询都被枚举出来,然后才会返回可以枚举和生成的全部内容。

是否有可能在上面的代码中以某种方式让步以使其更快?

我应该补充一点,所有平面文件查询的组合结果最多可能只有 1000 个左右的实体。

编辑:将其更改为以下内容不会对运行时间产生影响。(R# 甚至建议回到原来的样子)

private IEnumerable<Entity> GetData(_flatFileQueries)
{
        foreach (var entity in _flatFileQueries.AsParallel().SelectMany(ffq => ffq.Run()))
        {
            yield return entity;
        }
}
4

4 回答 4

3

代码更改的问题在于,它要等到所有平面文件查询都被枚举出来,然后才会返回可以枚举和生成的全部内容。

让我们通过一个简单的例子来证明它是错误的。首先,让我们创建一个TestQuery在给定时间后将产生单个实体的类。其次,让我们并行执行几个测试查询并测量产生结果所需的时间。

public class TestQuery : IFlatFileQuery {

    private readonly int _sleepTime;

    public IEnumerable<Entity> Run() {
        Thread.Sleep(_sleepTime);
        return new[] { new Entity() };
    }

    public TestQuery(int sleepTime) {
        _sleepTime = sleepTime;
    }

}

internal static class Program {

    private static void Main() {
        Stopwatch stopwatch = Stopwatch.StartNew();
        var queries = new IFlatFileQuery[] {
            new TestQuery(2000),
            new TestQuery(3000),
            new TestQuery(1000)
        };
        foreach (var entity in queries.AsParallel().SelectMany(ffq => ffq.Run()))
            Console.WriteLine("Yielded after {0:N0} seconds", stopwatch.Elapsed.TotalSeconds);
        Console.ReadKey();
    }

}

此代码打印:

1 秒
后输出 2 秒
后输出 3 秒后输出

您可以看到此输出AsParallel()将在每个结果可用时立即产生,因此一切正常。请注意,根据并行度的不同,您可能会得到不同的时序(例如并行度为 1 的“2s、5s、6s”,有效地使整个操作根本不并行)。此输出来自 4 核机器。

如果线程之间没有共同的瓶颈(例如共享锁定资源),您的长时间处理可能会随着内核数量的增加而扩展。您可能想要分析您的算法,以查看是否有可以使用dotTrace等工具改进的慢速部分。

于 2012-11-14T16:52:28.440 回答
2

我认为您的代码中的任何地方都没有危险信号。没有令人发指的低效率。我认为这归结为多个较小的差异。

PLINQ 非常擅长处理数据流。在内部,它比将项目逐个添加到同步列表更有效。我怀疑您的呼叫TryAdd是一个瓶颈,因为每个呼叫都需要在内部至少进行两次Interlocked操作。这些会给处理器间内存总线带来巨大的负载,因为所有线程都将竞争相同的高速缓存行。

PLINQ 更便宜,因为它在内部进行了一些缓冲。我确定它不会一个接一个地输出项目。可能它将它们分批并以这种方式将同步成本摊销到多个项目上。

第二个问题是BlockingCollection. 100不是很多。这可能会导致很多等待。等待代价高昂,因为它需要调用内核和上下文切换。

于 2012-11-14T17:24:15.283 回答
2

我做了这个在任何情况下都对我有用的替代方案:

这对我有用:

  • 在 ConcurrentQueue 中的 Parallel.Foreach 队列中的任务中,转换为要处理的项目。
  • 该任务有一个继续标记该任务结束的标志。
  • 在同一个执行线程中,任务结束一段时间出队并产生

对我来说快速而出色的结果:

Task.Factory.StartNew (() =>
{
    Parallel.ForEach<string> (TextHelper.ReadLines(FileName), ProcessHelper.DefaultParallelOptions,
    (string currentLine) =>
    {
        // Read line, validate and enqeue to an instance of FileLineData (custom class)
    });
}).
ContinueWith 
(
    ic => isCompleted = true 
);


while (!isCompleted || qlines.Count > 0)
{
    if (qlines.TryDequeue (out returnLine))
    {
        yield return returnLine;
    }
}
于 2013-05-20T06:00:17.607 回答
0

默认情况下,ParallelQuery该类在处理IEnumerable<T>源时采用称为“块分区”的分区策略。使用这种策略,每个工作线程每次都会抓取越来越多的项目。这意味着它有一个输入缓冲区。然后将结果累积到一个输出缓冲区中,该缓冲区的大小由系统选择,然后才可供查询的使用者使用。您可以使用配置选项EnumerablePartitionerOptions.NoBufferingParallelMergeOptions.NotBuffered.

private IEnumerable<Entity> GetData(ICollection<IFlatFileQuery> flatFileQueries)
{
    return Partitioner
        .Create(flatFileQueries, EnumerablePartitionerOptions.NoBuffering)
        .AsParallel()
        .AsOrdered()
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .SelectMany(ffq => ffq.Run());
}

这样,每个工作线程一次只会抓取一个项目,并在计算结果后立即传播结果。

NoBuffering:创建一个分区器,它一次从可枚举的源中获取项目,并且不使用可以被多个线程更有效地访问的中间存储。此选项提供对低延迟的支持(项目将在源可用时立即处理)并为项目之间的依赖关系提供部分支持(线程不能死锁等待线程本身负责处理的项目)。

NotBuffered:使用没有输出缓冲区的合并。计算出结果元素后,立即将该元素提供给查询的使用者。

于 2020-02-08T01:53:17.603 回答