一些假设:
// The movie IDs
IEnumerable<int> movieIds = ...;
// The actions.
var actions = movieIds.Select(
i => new { Id = i, Action = new ActionBlock<Frame>(MethodToProcessFrame) });
// The buffer block.
BufferBlock<Frame> buffer = ...;
// Link everything up.
foreach (var action in actions)
{
// Not necessary in C# 5.0, but still, good practice.
// The copy of the action.
var actionCopy = action;
// Link.
bufferBlock.LinkTo(actionCopy.Action, f => f.MovieId == actionCopy.Id);
}
如果是这种情况,则说明您创建了太多ActionBlock<T>
没有得到工作的实例;因为你的帧(可能还有电影)是乱序的,所以你不能保证所有的ActionBlock<T>
实例都会有工作要做。
此外,当您创建一个ActionBlock<T>
实例时,它将使用MaxDegreeOfParallelism
1 创建,这意味着它是线程安全的,因为只有一个线程可以同时访问该块。
此外,TPL DataFlow 库最终依赖于Task<TResult>
类,默认情况下它在线程池上调度。线程池将在这里做一些事情:
看起来您处理电影的方法是通用的,并且传入电影中的哪一帧并不重要(如果确实重要,那么您需要用它来更新您的问题,因为它会改变很多事情)。这也意味着它是线程安全的。
此外,如果可以假设一帧的处理不依赖于任何先前帧的处理(或者,看起来电影的帧是按顺序排列的),您可以使用单个 ActionBlock<T>
但调整MaxDegreeOfParallelism
值,像这样:
// The buffer block.
BufferBlock<Frame> buffer = ...;
// Have *one* ActionBlock<T>
var action = new ActionBlock<Frame>(MethodToProcessFrame,
// This is where you tweak the concurrency:
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 4,
}
);
// Link. No filter needed.
bufferBlock.LinkTo(action);
现在,你ActionBlock<T>
将永远饱和。诚然,任何负责任的任务调度程序(默认为线程池)仍然会限制最大并发量,但它会尽可能多地同时合理地做。
为此,如果您的操作是真正线程安全的,您可以设置MaxDegreeOfParallelism
to DataflowBlockOptions.Unbounded
,如下所示:
// Have *one* ActionBlock<T>
var action = new ActionBlock<Frame>(MethodToProcessFrame,
// This is where you tweak the concurrency:
new ExecutionDataflowBlockOptions {
// We're thread-safe, let the scheduler determine
// how nuts we can go.
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
}
);
当然,所有这些都假设其他一切都是最优的(I/O 读/写等)