1

我有一个从包含 100s-1000 行的文本文件中读取的应用程序。

每行都使用 parallel.foreach 进行处理,使用 ParallelOptions 来限制触发的任务数量。这是“控制器”任务。

在这个“控制器”parallel.foreach 中是另一个parallel.foreach,它将执行实际工作。触发的每个控制器任务将执行相同的工作,并使用原始文件行中指定的不同数据输入。再次,这项工作 parallel.foreach 使用并行选项来限制触发的任务数量。

在我上次的测试中,我使用了 Controller foreach: MaxDegreeOfParallelism=4 Worker foreach: MaxDegreeOfParallelism:4

根据我的数学,这应该意味着任何时候最多有 16 个任务在工作。

但是,当我检查 perfmon.exe 时,我可以看到我的应用程序使用了 700 个线程。再过几个小时,这将超过 1000。

怎么会这样?为什么 GC 不收集这些完成的线程?

以前,我的代码在 Thread[] 中触发了具有相同问题的实际线程。然后我将它移到一个 Task[] 并遇到了同样的问题。我假设某处存在线程泄漏,并且引用仍然指向线程/任务。我花了很多时间寻找这个无济于事。

所以我转向parallel.foreach 的想法是,如果我从未创建过对任务的引用,就不会发生任务泄漏,因为这一切都发生在一个lamda 中。

但问题仍然存在。关于为什么会这样的任何想法?还是正常?

添加了下面的代码,由于所有测试和尝试调试这个问题,它有点乱,试图清理一下。

public static void RunActions(
    List<paramsActionSettings> listActions,
    string[] arrList,
    int numThreads,
    string domain = null,
    delGetParamsActionSettings delGetActionsList = null,
    delProcessString callbackActionsComplete = null
    )
{

    int iCntr= 0;
    int iTotal = arrList.Length;


    ParallelOptions prlOptions = new ParallelOptions
    {
        MaxDegreeOfParallelism = numThreads
    };

    //foreach (string listItemIter in arrList)
    object oLock = new object();
    Parallel.ForEach(arrList, prlOptions,(listItemIter) =>
    {
        lock (oLock)
        {
            Console.WriteLine("starting "+iCntr + " of " + iTotal + " run actions");
            iCntr++;
        }

        string listItemCopySafe = string.Copy(listItemIter);

        bool bCanDo = true;
        List<paramsActionSettings> listActionsUse;
        if (listActions == null)
        {
            listActionsUse = delGetActionsList();
        }
        else
        {
            listActionsUse = listActions;
        }
        foreach (paramsActionSettings prms in listActionsUse)
        {
            if (prms.delCanDo != null && !prms.delCanDo(listItemCopySafe, domain))
            {
                bCanDo = false;
                break;
            }
        }
        if (!bCanDo) return;


        List<paramsFire> listParams = new List<paramsFire>();

        //create a list of paramsfire objects, the object holds the params and the delfunction
        foreach (paramsActionSettings prms in listActionsUse)
        {
            listParams.Add(new paramsFire(prms.delGetDoParams(listItemCopySafe), prms.delDoSomething));
        }


        FireActions(listParams, callbackActionsComplete, listItemCopySafe);
        Console.WriteLine("Finished " + iCntr + " of " + iTotal );
    }); 
}



private static void FireActions(List<paramsFire> list, delProcessString callbackActionsComplete, string itemArr)
{
    int icntr = 0;
    foreach (paramsFire prms in list)
    {
        try
        {
            if (icntr == 0) 
            {
                if (!prms.delDoSomething(prms.oParams))
                {
                    break;
                }
            }
            else
            {
                prms.delDoSomething(prms.oParams); 
            }
            icntr++;

        }
        catch (Exception e)
        {
            ErrorLog.WriteLine("foreach (paramsFire prms in list)");
            UtilException.Dump(e, "foreach (paramsFire prms in list)");
        }
    } 
     if (callbackActionsComplete != null)
    {
        try
        {
            callbackActionsComplete(itemArr);
        }
        catch { }
    }
}
4

2 回答 2

1

我做了一点重写。我的机器上最多只能有 22 个线程(8 个 proc 框)。随意使用以下任何更改:

    public static void RunActions(
        IEnumerable<paramsActionSettings> listActions,
        IEnumerable<string> arrList,
        int numThreads,
        string domain = null,
        delGetParamsActionSettings delGetActionsList = null,
        delProcessString callbackActionsComplete = null)
    {

        var cntr = 0;
        var total = arrList.Count();

        var prlOptions = new ParallelOptions
        {
            MaxDegreeOfParallelism = numThreads
        };

        ////foreach (var listItemIter in arrList)
        Parallel.ForEach(arrList, prlOptions, listItemIter =>
        {
            Interlocked.Increment(ref cntr);
            Console.WriteLine("starting " + cntr + " of " + total + " run actions");

            var listItemCopySafe = string.Copy(listItemIter);

            var listActionsUse = listActions ??
                ((delGetActionsList == null) ? new paramsActionSettings[0] : delGetActionsList());
            var canDo = listActionsUse.All(prms => prms.delCanDo == null
                || prms.delCanDo(listItemCopySafe, domain));

            if (!canDo)
            {
                return;
            }

            var listParams = listActionsUse.Select(prms => new paramsFire(
                prms.delGetDoParams(listItemCopySafe),
                prms.delDoSomething));

            // create a list of paramsfire objects, the object holds the params and the delfunction
            FireActions(listParams, callbackActionsComplete, listItemCopySafe);
            Console.WriteLine("Finished " + cntr + " of " + total);
        });
    }

    private static void FireActions(
        IEnumerable<paramsFire> list,
        delProcessString callbackActionsComplete,
        string itemArr)
    {
        var icntr = 0;

        foreach (var prms in list)
        {
            try
            {
                if (icntr == 0)
                {
                    if (!prms.delDoSomething(prms.oParams))
                    {
                        break;
                    }
                }
                else
                {
                    prms.delDoSomething(prms.oParams);
                }

                icntr++;
            }
            catch (Exception e)
            {
                ErrorLog.WriteLine("foreach (paramsFire prms in list)");
                UtilException.Dump(e, "foreach (paramsFire prms in list)");
            }
        }

        if (callbackActionsComplete == null)
        {
            return;
        }

        try
        {
            callbackActionsComplete(itemArr);
        }
        catch
        {
        }
    }
于 2012-07-11T15:34:04.440 回答
1

显然,问题不在于任何特定的 API,正如您所说的线程、任务和Parallel.ForEach.

不要问为什么框架没有做它的工作(因为它是)。问一下,为什么你的代码产生的线程比你预期的要多。如果没有看到更多代码,这个问题就无法完全回答。

于 2012-07-10T19:11:12.527 回答