我有一个从包含 100s-1000 行的文本文件中读取的应用程序。
每行都使用 parallel.foreach 进行处理,使用 ParallelOptions 来限制触发的任务数量。这是“控制器”任务。
在这个“控制器”parallel.foreach 中是另一个parallel.foreach,它将执行实际工作。触发的每个控制器任务将执行相同的工作,并使用原始文件行中指定的不同数据输入。再次,这项工作 parallel.foreach 使用并行选项来限制触发的任务数量。
在我上次的测试中,我使用了 Controller foreach: MaxDegreeOfParallelism=4 Worker foreach: MaxDegreeOfParallelism:4
根据我的数学,这应该意味着任何时候最多有 16 个任务在工作。
但是,当我检查 perfmon.exe 时,我可以看到我的应用程序使用了 700 个线程。再过几个小时,这将超过 1000。
怎么会这样?为什么 GC 不收集这些完成的线程?
以前,我的代码在 Thread[] 中触发了具有相同问题的实际线程。然后我将它移到一个 Task[] 并遇到了同样的问题。我假设某处存在线程泄漏,并且引用仍然指向线程/任务。我花了很多时间寻找这个无济于事。
所以我转向parallel.foreach 的想法是,如果我从未创建过对任务的引用,就不会发生任务泄漏,因为这一切都发生在一个lamda 中。
但问题仍然存在。关于为什么会这样的任何想法?还是正常?
添加了下面的代码,由于所有测试和尝试调试这个问题,它有点乱,试图清理一下。
public static void RunActions(
List<paramsActionSettings> listActions,
string[] arrList,
int numThreads,
string domain = null,
delGetParamsActionSettings delGetActionsList = null,
delProcessString callbackActionsComplete = null
)
{
int iCntr= 0;
int iTotal = arrList.Length;
ParallelOptions prlOptions = new ParallelOptions
{
MaxDegreeOfParallelism = numThreads
};
//foreach (string listItemIter in arrList)
object oLock = new object();
Parallel.ForEach(arrList, prlOptions,(listItemIter) =>
{
lock (oLock)
{
Console.WriteLine("starting "+iCntr + " of " + iTotal + " run actions");
iCntr++;
}
string listItemCopySafe = string.Copy(listItemIter);
bool bCanDo = true;
List<paramsActionSettings> listActionsUse;
if (listActions == null)
{
listActionsUse = delGetActionsList();
}
else
{
listActionsUse = listActions;
}
foreach (paramsActionSettings prms in listActionsUse)
{
if (prms.delCanDo != null && !prms.delCanDo(listItemCopySafe, domain))
{
bCanDo = false;
break;
}
}
if (!bCanDo) return;
List<paramsFire> listParams = new List<paramsFire>();
//create a list of paramsfire objects, the object holds the params and the delfunction
foreach (paramsActionSettings prms in listActionsUse)
{
listParams.Add(new paramsFire(prms.delGetDoParams(listItemCopySafe), prms.delDoSomething));
}
FireActions(listParams, callbackActionsComplete, listItemCopySafe);
Console.WriteLine("Finished " + iCntr + " of " + iTotal );
});
}
private static void FireActions(List<paramsFire> list, delProcessString callbackActionsComplete, string itemArr)
{
int icntr = 0;
foreach (paramsFire prms in list)
{
try
{
if (icntr == 0)
{
if (!prms.delDoSomething(prms.oParams))
{
break;
}
}
else
{
prms.delDoSomething(prms.oParams);
}
icntr++;
}
catch (Exception e)
{
ErrorLog.WriteLine("foreach (paramsFire prms in list)");
UtilException.Dump(e, "foreach (paramsFire prms in list)");
}
}
if (callbackActionsComplete != null)
{
try
{
callbackActionsComplete(itemArr);
}
catch { }
}
}