2

我想并行化逐帧处理多个视频剪辑的应用程序。每个剪辑的每一帧的序列很重要(显然)。我决定使用 TPL 数据流,因为我相信这是数据流的一个很好的例子(电影帧是数据)。

所以我有一个从数据库加载帧的进程(可以说是一批 500 个,全部聚集在一起)

Example sequence:    
|mid:1 fr:1|mid:1 fr:2|mid:2 fr:1|mid:3 fr:1|mid:1 fr:3|mid:2 fr:2|mid:2 fr:3|mid:1 fr:4|

并将它们发布到 BufferBlock。对于这个 BufferBlock,我将 ActionBlocks 与过滤器链接起来,让每个 MovieID 有一个 ActionBlock,这样我就可以获得某种数据分区。每个动作块都是顺序的,但理想情况下,多部电影的多个动作块可以并行运行。

我确实有上述网络工作并且它确实并行运行,但根据我的计算,只有八到十个动作块同时执行。我计时了每个 ActionBlock 的运行时间,大约为 100-200 毫秒。我可以采取哪些步骤来至少双倍并发?

我确实尝试将操作委托转换为异步方法并在 ActionBlock 操作委托中使数据库访问异步,但它没有帮助。

编辑:我实现了额外级别的数据分区:具有奇数 ID 的电影的帧在 ServerA 上处理,偶数电影的帧在 ServerB 上处理。应用程序的两个实例都访问了同一个数据库。如果我的问题是 DB IO,那么我不会看到处理的总帧数有任何改善(或者很少,低于 20%)。但我确实看到它翻了一番。所以这让我得出结论,Threadpool 没有产生更多线程来并行处理更多帧(两台服务器都是四核的,分析器显示每个应用程序大约有 25-30 个线程)。

4

3 回答 3

2

一些假设:

  • 从您的示例数据中,您正在接收乱序的电影帧(可能还有电影中的帧)

  • 您的ActionBlock<T>实例是通用的;它们都调用相同的处理方法,您只需根据每个电影 ID 创建一个列表(您事先有一个电影 ID 列表),如下所示:

// The movie IDs
IEnumerable<int> movieIds = ...;

// The actions.
var actions = movieIds.Select(
    i => new { Id = i, Action = new ActionBlock<Frame>(MethodToProcessFrame) });

// The buffer block.
BufferBlock<Frame> buffer = ...;

// Link everything up.
foreach (var action in actions) 
{
    // Not necessary in C# 5.0, but still, good practice.
    // The copy of the action.
    var actionCopy = action;

    // Link.
    bufferBlock.LinkTo(actionCopy.Action, f => f.MovieId == actionCopy.Id);
}

如果是这种情况,则说明您创建了太多ActionBlock<T>没有得到工作的实例;因为你的帧(可能还有电影)是乱序的,所以你不能保证所有的ActionBlock<T>实例都会有工作要做。

此外,当您创建一个ActionBlock<T>实例时,它将使用MaxDegreeOfParallelism1 创建,这意味着它是线程安全的,因为只有一个线程可以同时访问该块。

此外,TPL DataFlow 库最终依赖于Task<TResult>,默认情况下它在线程池上调度。线程池将在这里做一些事情:

  • 确保所有处理器内核都已饱和。这与确保您的实例饱和非常不同,是您应该关注的指标ActionBlock<T>

  • 确保在处理器内核饱和时,确保工作均匀分布,并确保没有太多并发任务正在执行(上下文切换代价高昂)。

看起来您处理电影的方法是通用的,并且传入电影中的哪一帧并不重要(如果确实重要,那么您需要用它来更新您的问题,因为它会改变很多事情)。这也意味着它是线程安全的。

此外,如果可以假设一帧的处理不依赖于任何先前帧的处理(或者,看起来电影的帧是按顺序排列的),您可以使用单个 ActionBlock<T>但调整MaxDegreeOfParallelism值,像这样:

// The buffer block.
BufferBlock<Frame> buffer = ...;

// Have *one* ActionBlock<T>
var action = new ActionBlock<Frame>(MethodToProcessFrame,
    // This is where you tweak the concurrency:
    new ExecutionDataflowBlockOptions {
        MaxDegreeOfParallelism = 4,
    }
);

// Link.  No filter needed.
bufferBlock.LinkTo(action);

现在,你ActionBlock<T>永远饱和。诚然,任何负责任的任务调度程序(默认为线程池)仍然会限制最大并发量,但它会尽可能多地同时合理地做。

为此,如果您的操作是真正线程安全的,您可以设置MaxDegreeOfParallelismto DataflowBlockOptions.Unbounded,如下所示:

// Have *one* ActionBlock<T>
var action = new ActionBlock<Frame>(MethodToProcessFrame,
    // This is where you tweak the concurrency:
    new ExecutionDataflowBlockOptions {
        // We're thread-safe, let the scheduler determine
        // how nuts we can go.
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    }
);

当然,所有这些都假设其他一切都是最优的(I/O 读/写等)

于 2012-11-15T19:00:33.680 回答
-1

奇怪的是,这是最佳的并行化程度。老实说,线程池非常擅长确定要激活的实际线程的最佳数量。我的猜测是您的硬件可以支持实际上并行工作的许多并行进程。如果您添加更多内容,您实际上不会增加​​吞吐量,您只会花费更多时间在线程之间进行上下文切换,而实际处理它们的时间会减少。

如果您注意到,在很长一段时间内,您的 CPU 负载、内存总线、网络连接、磁盘访问等都在容量不足的情况下工作,那么您可能遇到了问题,您需要检查一下是什么实际上是瓶颈。尽管某处的某些资源可能已达到其容量,但 TPL 已经认识到这一点并确保它不会过度饱和该资源。

于 2012-11-14T17:01:58.280 回答
-1

我怀疑你是 IO 绑定的。问题是在哪里?在读或写上。你写的数据比读的多吗?CPU 可能低于 50%,因为它无法更快地写入。

我并不是说 ActionBlock 是错误的,但我会考虑使用 BlockingCollection 的生产者消费者。优化您读取和写入数据的方式。

这不同,但我有一个应用程序,我可以在其中阅读文本块。解析文本,然后将单词写回 SQL。我在单个线程上读取,然后并行解析,然后在单个线程上写入。我写在一个线程上,以免破坏索引。如果您受 IO 限制,则需要弄清楚最慢的 IO 是什么,然后优化该过程。

告诉我更多关于那个 IO 的信息。

在您提到从数据库中读取的问题中。
我会试试 BlockingCollections。
BlockingCollection 类
并且每个都有大小限制,这样你就不会浪费内存。
让它足够大,以至于它(几乎)永远不会变空。
最慢步骤后的阻塞集合将变为空。如果你可以并行处理,那么就这样做。
我发现表中的并行插入并不快。
让一个过程锁定并保持它并保持软管打开。
仔细看看你是如何插入的。
一次一排很慢。
我使用 TVP 并一次插入 10,000 个,但很多人喜欢 Drapper 或 BulkInsert。
如果您删除索引和触发器并插入按聚集索引排序将是最快的。拿一个小块并握住它。我正在 10 毫秒范围内插入。
现在更新是最慢的。看看那个——你一次只做一排吗?
看看拿 tabblock 和做视频剪辑。
除非它是一个丑陋的更新,否则它不应该比插入花费更长的时间。

于 2012-11-14T18:52:18.117 回答