我有一个利用并行化处理数据的应用程序。
主程序在 C# 中,而用于分析数据的例程之一是在外部 C++ dll 上。每次在数据中找到某个信号时,该库都会扫描数据并调用回调。数据应该被收集、分类然后存储到 HD 中。
这是我对回调调用的方法以及排序和存储数据的方法的第一个简单实现:
// collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// method invoked by the callback
private void Collect(int type, long time)
{
lock(locker) { mySignalList.Add(new MySignal(type, time)); }
}
// store signals to disk
private void Store()
{
// sort the signals
mySignalList.Sort();
// file is a object that manages the writing of data to a FileStream
file.Write(mySignalList.ToArray());
}
数据由大小为 10000 xn 的二维数组(short[][] 数据)组成,其中 n 变量。我以这种方式使用并行化:
Parallel.For(0, 10000, (int i) =>
{
// wrapper for the external c++ dll
ProcessData(data[i]);
}
现在对于 10000 个数组中的每一个,我估计可以触发 0 到 4 个回调。我正面临瓶颈,并且鉴于我的 CPU 资源没有被过度使用,我认为锁(连同数千个回调)是问题所在(我是对的还是可能有其他问题?)。我已经尝试过 ConcurrentBag 集合,但性能仍然更差(与其他用户发现一致)。
我认为使用无锁代码的可能解决方案是拥有多个集合。然后有必要制定一种策略,使并行进程的每个线程都在单个集合上工作。例如,集合可以在以线程 ID 作为键的字典中,但我不知道任何 .NET 工具(我应该知道线程 ID 以在启动并行化之前初始化字典)。这个想法是否可行,如果是的话,是否存在一些用于此目的的 .NET 工具?或者,还有其他加快流程的想法吗?
[编辑] 我遵循了 Reed Copsey 的建议并使用了以下解决方案(根据 VS2010 的分析器,在锁定和添加到列表的负担之前占用了 15% 的资源,而现在只有 1%):
// master collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// thread-local storage of data (each thread is working on its List<MySignal>)
ThreadLocal<List<MySignal>> threadLocal;
// analyze data
private void AnalizeData()
{
using(threadLocal = new ThreadLocal<List<MySignal>>(() =>
{ return new List<MySignal>(); }))
{
Parallel.For<int>(0, 10000,
() =>
{ return 0;},
(i, loopState, localState) =>
{
// wrapper for the external c++ dll
ProcessData(data[i]);
return 0;
},
(localState) =>
{
lock(this)
{
// add thread-local lists to the master collection
mySignalList.AddRange(local.Value);
local.Value.Clear();
}
});
}
}
// method invoked by the callback
private void Collect(int type, long time)
{
local.Value.Add(new MySignal(type, time));
}