57

我正在使用 Parallel.ForEach 并且我正在做一些数据库更新,现在没有设置 MaxDegreeOfParallelism ,双核处理器机器会导致 sql 客户端超时,而其他四核处理器机器则不会超时。

现在我无法控制我的代码运行时可用的处理器内核类型,但是我可以使用 MaxDegreeOfParallelism 更改一些设置,这些设置可能会同时运行更少的操作并且不会导致超时?

我可以增加超时时间,但这不是一个好的解决方案,如果在较低的 CPU 上我可以同时处理更少的操作,那将减少 cpu 的负载。

好的,我也阅读了所有其他帖子和 MSDN,但是将 MaxDegreeOfParallelism 设置为较低的值会使我的四核机器受到影响吗?

例如,有没有办法做类似的事情,如果 CPU 有两个内核,那么使用 20,如果 CPU 有四个内核,那么使用 40?

4

5 回答 5

77

答案是,它是整个并行运行的上限,与核数无关。

因此,即使您因为等待 IO 或锁而没有使用 CPU,也不会并行运行额外的任务,只有您指定的最大值。

为了解决这个问题,我编写了这段测试代码。那里有一个人工锁来刺激 TPL 使用更多线程。当您的代码等待 IO 或数据库时,也会发生同样的情况。

class Program
{
    static void Main(string[] args)
    {
        var locker = new Object();
        int count = 0;
        Parallel.For
            (0
             , 1000
             , new ParallelOptions { MaxDegreeOfParallelism = 2 }
             , (i) =>
                   {
                       Interlocked.Increment(ref count);
                       lock (locker)
                       {
                           Console.WriteLine("Number of active threads:" + count);
                           Thread.Sleep(10);
                        }
                        Interlocked.Decrement(ref count);
                    }
            );
    }
}

如果我没有指定 MaxDegreeOfParallelism,控制台日志显示最多同时运行大约 8 个任务。像这样:

Number of active threads:6
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:6
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7

它开始时较低,随着时间的推移而增加,最后它试图同时运行 8 个。

如果我将它限制为某个任意值(比如 2),我会得到

Number of active threads:2
Number of active threads:1
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2

哦,这是在四核机器上。

于 2012-03-02T19:34:26.920 回答
24

例如,有没有办法做类似的事情,如果 CPU 有两个内核,那么使用 20,如果 CPU 有四个内核,那么使用 40?

您可以这样做以使并行性取决于 CPU 内核的数量:

var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
Parallel.ForEach(sourceCollection, options, sourceItem =>
{
    // do something
});

但是,较新的 CPU 倾向于使用超线程来模拟额外的内核。因此,如果您有一个四核处理器,那么Environment.ProcessorCount可能会将其报告为 8 核。我发现,如果您将并行度设置为考虑模拟内核,那么它​​实际上会减慢其他线程,例如 UI 线程。

因此,尽管操作会更快完成,但应用程序 UI 在此期间可能会遇到明显的延迟。将“Environment.ProcessorCount”除以 2 似乎可以达到相同的处理速度,同时仍然保持 CPU 可用于 UI 线程。

于 2013-07-26T14:19:50.940 回答
2

听起来您并行运行的代码正在死锁,这意味着除非您能找到并解决导致该问题的问题,否则您根本不应该将其并行化。

于 2012-03-02T18:31:31.690 回答
1

其他需要考虑的事情,特别是对于那些多年后发现这一点的人来说,取决于您的情况,通常最好在 DataTable 中收集所有数据,然后在每个主要任务结束时使用 SqlBulkCopy。

例如,我创建了一个处理数百万个文件的进程,当每个文件事务进行数据库查询以插入记录时,我遇到了相同的错误。相反,我将其全部存储在内存中的 DataTable 中,用于我迭代的每个共享,将 DataTable 转储到我的 SQL Server 并在每个单独的共享之间清除它。批量插入需要一秒钟的时间,并且不会一次打开数千个连接。

编辑:这是一个快速而肮脏的工作示例 SQLBulkCopy 方法:

private static void updateDatabase(DataTable targetTable)
    {
        try
        {
            DataSet ds = new DataSet("FileFolderAttribute");
            ds.Tables.Add(targetTable);
            writeToLog(targetTable.TableName + " - Rows: " + targetTable.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            writeToLog(@"Opening SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Opening SQL connection");
            SqlConnection sqlConnection = new SqlConnection(sqlConnectionString);
            sqlConnection.Open();
            SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConnection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null);
            bulkCopy.DestinationTableName = "FileFolderAttribute";
            writeToLog(@"Copying data to SQL Server table", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Copying data to SQL Server table");
            foreach (var table in ds.Tables)
            {
                writeToLog(table.ToString(), logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                Console.WriteLine(table.ToString());
            }
            bulkCopy.WriteToServer(ds.Tables[0]);

            sqlConnection.Close();
            sqlConnection.Dispose();
            writeToLog(@"Closing SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            writeToLog(@"Clearing local DataTable...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Closing SQL connection");
            Console.WriteLine(@"Clearing local DataTable...");
            targetTable.Clear();
            ds.Tables.Remove(targetTable);
            ds.Clear();
            ds.Dispose();
        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logDatabaseFile);
        }
    }

...并将其转储到数据表中:

private static void writeToDataTable(string ServerHostname, string RootDirectory, string RecordType, string Path, string PathDirectory, string PathFileName, string PathFileExtension, decimal SizeBytes, decimal SizeMB, DateTime DateCreated, DateTime DateModified, DateTime DateLastAccessed, string Owner, int PathLength, DateTime RecordWriteDateTime)
    {
        try
        {
            if (tableToggle)
            {
                DataRow toInsert = results_1.NewRow();
                toInsert[0] = ServerHostname;
                toInsert[1] = RootDirectory;
                toInsert[2] = RecordType;
                toInsert[3] = Path;
                toInsert[4] = PathDirectory;
                toInsert[5] = PathFileName;
                toInsert[6] = PathFileExtension;
                toInsert[7] = SizeBytes;
                toInsert[8] = SizeMB;
                toInsert[9] = DateCreated;
                toInsert[10] = DateModified;
                toInsert[11] = DateLastAccessed;
                toInsert[12] = Owner;
                toInsert[13] = PathLength;
                toInsert[14] = RecordWriteDateTime;

                results_1.Rows.Add(toInsert);
            }
            else
            {
                DataRow toInsert = results_2.NewRow();
                toInsert[0] = ServerHostname;
                toInsert[1] = RootDirectory;
                toInsert[2] = RecordType;
                toInsert[3] = Path;
                toInsert[4] = PathDirectory;
                toInsert[5] = PathFileName;
                toInsert[6] = PathFileExtension;
                toInsert[7] = SizeBytes;
                toInsert[8] = SizeMB;
                toInsert[9] = DateCreated;
                toInsert[10] = DateModified;
                toInsert[11] = DateLastAccessed;
                toInsert[12] = Owner;
                toInsert[13] = PathLength;
                toInsert[14] = RecordWriteDateTime;

                results_2.Rows.Add(toInsert);
            }


        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logFile);
        }
    }

...这是上下文,循环片段本身:

private static void processTargetDirectory(DirectoryInfo rootDirectory, string targetPathRoot)
    {
        DateTime StartTime = DateTime.Now;
        int directoryCount = 0;
        int fileCount = 0;
        try
        {                
            manageDataTables();

            Console.WriteLine(rootDirectory.FullName);
            writeToLog(@"Working in Directory: " + rootDirectory.FullName, logFile, getLineNumber(), getCurrentMethod(), true);

            applicationsDirectoryCount++;

            // REPORT DIRECTORY INFO //
            string directoryOwner = "";
            try
            {
                directoryOwner = File.GetAccessControl(rootDirectory.FullName).GetOwner(typeof(System.Security.Principal.NTAccount)).ToString();
            }
            catch (Exception error)
            {
                //writeToLog("\t" + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                writeToLog("[" + error.Message + "] - " + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                errorLogging(error, getCurrentMethod(), logFile);
                directoryOwner = "SeparatedUser";
            }

            writeToRawLog(serverHostname + "," + targetPathRoot + "," + "Directory" + "," + rootDirectory.Name + "," + rootDirectory.Extension + "," + 0 + "," + 0 + "," + rootDirectory.CreationTime + "," + rootDirectory.LastWriteTime + "," + rootDirectory.LastAccessTime + "," + directoryOwner + "," + rootDirectory.FullName.Length + "," + DateTime.Now + "," + rootDirectory.FullName + "," + "", logResultsFile, true, logFile);
            //writeToDBLog(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);
            writeToDataTable(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);

            if (rootDirectory.GetDirectories().Length > 0)
            {
                Parallel.ForEach(rootDirectory.GetDirectories(), new ParallelOptions { MaxDegreeOfParallelism = directoryDegreeOfParallelism }, dir =>
                {
                    directoryCount++;
                    Interlocked.Increment(ref threadCount);
                    processTargetDirectory(dir, targetPathRoot);
                });

            }

            // REPORT FILE INFO //
            Parallel.ForEach(rootDirectory.GetFiles(), new ParallelOptions { MaxDegreeOfParallelism = fileDegreeOfParallelism }, file =>
            {
                applicationsFileCount++;
                fileCount++;
                Interlocked.Increment(ref threadCount);
                processTargetFile(file, targetPathRoot);
            });

        }
        catch (Exception error)
        {
            writeToLog(error.Message, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
            errorLogging(error, getCurrentMethod(), logFile);
        }
        finally
        {
            Interlocked.Decrement(ref threadCount);
        }

        DateTime EndTime = DateTime.Now;
        writeToLog(@"Run time for " + rootDirectory.FullName + @" is: " + (EndTime - StartTime).ToString() + @" | File Count: " + fileCount + @", Directory Count: " + directoryCount, logTimingFile, getLineNumber(), getCurrentMethod(), true);
    }

如上所述,这既快又脏,但效果很好。

对于我遇到大约 2,000,000 条记录后遇到的与内存相关的问题,我必须创建第二个 DataTable 并在这两个数据表之间交替,在交替之间将记录转储到 SQL 服务器。所以我的 SQL 连接由每 100,000 条记录中的 1 个组成。

我是这样管理的:

private static void manageDataTables()
    {
        try
        {
            Console.WriteLine(@"[Checking datatable size] toggleValue: " + tableToggle + " | " + @"r1: " + results_1.Rows.Count + " - " + @"r2: " + results_2.Rows.Count);
            if (tableToggle)
            {
                int rowCount = 0;
                if (results_1.Rows.Count > datatableRecordCountThreshhold)
                {
                    tableToggle ^= true;
                    writeToLog(@"results_1 row count > 100000 @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    rowCount = results_1.Rows.Count;
                    logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
                    Thread.Sleep(5000);
                    if (results_1.Rows.Count != rowCount)
                    {
                        writeToLog(@"results_1 row count increased, @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        rowCount = results_1.Rows.Count;
                        Thread.Sleep(15000);
                    }
                    writeToLog(@"results_1 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    updateDatabase(results_1);
                    results_1.Clear();
                    writeToLog(@"results_1 cleared, count: " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                }

            }
            else
            {
                int rowCount = 0;
                if (results_2.Rows.Count > datatableRecordCountThreshhold)
                {
                    tableToggle ^= true;
                    writeToLog(@"results_2 row count > 100000 @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    rowCount = results_2.Rows.Count;
                    logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
                    Thread.Sleep(5000);
                    if (results_2.Rows.Count != rowCount)
                    {
                        writeToLog(@"results_2 row count increased, @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        rowCount = results_2.Rows.Count;
                        Thread.Sleep(15000);
                    }
                    writeToLog(@"results_2 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    updateDatabase(results_2);
                    results_2.Clear();
                    writeToLog(@"results_2 cleared, count: " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                }
            }
        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logDatabaseFile);
        }
    }

其中“datatableRecordCountThreshhold = 100000”

于 2019-02-08T22:45:43.177 回答
-2

它设置并行运行的线程数...

于 2012-03-02T18:29:00.257 回答