4

在尝试通过 StreamReader 将 800MB 文本文件加载到 DataTable 时,我遇到了 OutOfMemory 异常。我想知道是否有办法从内存流中批量加载 DataTable,即从 StreamReader 读取文本文件的前 10,000 行,创建 DataTable,使用 DataTable 执行某些操作,然后将接下来的 10,000 行加载到 StreamReader 和很快。

我的谷歌在这里不是很有帮助,但似乎应该有一个简单的方法来做到这一点。最终,我将使用 SqlBulkCopy 将 DataTables 写入 MS SQL 数据库,因此如果有比我所描述的更简单的方法,我将感谢您提供正确方向的快速指针。

编辑 - 这是我正在运行的代码:

public static DataTable PopulateDataTableFromText(DataTable dt, string txtSource)
{

    StreamReader sr = new StreamReader(txtSource);
    DataRow dr;
    int dtCount = dt.Columns.Count;
    string input;
    int i = 0;

    while ((input = sr.ReadLine()) != null)
    {

        try
        {
            string[] stringRows = input.Split(new char[] { '\t' });
            dr = dt.NewRow();
            for (int a = 0; a < dtCount; a++)
            {
                string dataType = dt.Columns[a].DataType.ToString();
                if (stringRows[a] == "" && (dataType == "System.Int32" || dataType == "System.Int64"))
                {
                    stringRows[a] = "0";
                }
                dr[a] = Convert.ChangeType(stringRows[a], dt.Columns[a].DataType);

            }
            dt.Rows.Add(dr);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
        i++;
    }
    return dt;
}

这是返回的错误:

“System.OutOfMemoryException:
在 System.String.Split(Char[] separator, Int32 count, StringSplitOptions options)
at System.String.Split(Char[] separator}
at Harvester.Config .PopulateDataTableFromText(DataTable dt, String txtSource) 在 C:...."

关于将数据直接加载到 SQL 中的建议 - 在 C# 方面我有点菜鸟,但我认为这基本上就是我正在做的事情?SqlBulkCopy.WriteToServer 获取我从文本文件创建的 DataTable 并将其导入 sql。有没有更简单的方法可以做到这一点,我错过了?

编辑:哦,我忘了提 - 此代码不会与 SQL Server 在同一台服务器上运行。数据文本文件位于服务器 B 上,需要写入服务器 A 中的表。这是否排除了使用 bcp?

4

4 回答 4

5

您是否考虑过将数据直接加载到 SQL Server 中,然后在数据库中进行操作?数据库引擎已经被设计为以有效的方式执行大量数据的操作。这可能会产生更好的整体结果,并允许您利用数据库和 SQL 语言的功能来完成繁重的工作。这是旧的“更聪明地工作而不是更努力地工作”的原则。

有许多不同的方法可以将数据加载到 SQL Server中,因此您可能需要检查这些方法,看看是否有合适的方法。如果您使用的是 SQLServer 2005 或更高版本,并且确实需要对 C# 中的数据进行一些操作,则始终可以使用托管存储过程

这里要意识到的是,这OutOfMemoryException有点误导。内存不仅仅是您拥有的物理 RAM 的数量。您可能用完的是可寻址内存。这是一个非常不同的事情。

当您将一个大文件加载到内存中并将其转换为一个文件时,DataTable它可能需要远远超过 800Mb 来表示相同的数据。由于 32 位 .NET 进程仅限于 2Gb 以下的可寻址内存,因此您可能永远无法在单个批次中处理如此数量的数据。

您可能需要做的是以流式方式处理数据。换句话说,不要尝试将其全部加载到 a 中DataTable,然后批量插入到 SQLServer。而是以块的形式处理文件,在完成后清除之前的一组行。

现在,如果您可以访问具有大量内存(以避免 VM 抖动)的 64 位计算机和 64 位 .NET 运行时的副本,那么您可能会在不改变运行代码的情况下摆脱困境。但我建议无论如何都要进行必要的更改,因为即使在那种环境下它也可能会提高性能。

于 2010-09-28T20:32:14.100 回答
3

你真的需要分批处理数据吗?或者你可以逐行处理它吗?在后一种情况下,我认为 Linq 在这里可能会很有帮助,因为它可以很容易地通过方法的“管道”流式传输数据。这样你就不需要一次加载很多数据,一次只加载一行

首先,您需要使您的StreamReader可枚举。这可以通过扩展方法轻松完成:

public static class TextReaderExtensions
{
    public static IEnumerable<string> Lines(this TextReader reader)
    {
        string line;
        while((line = reader.ReadLine()) != null)
        {
            yield return line;
        }
    }
}

这样您就可以将StreamReader用作 Linq 查询的源。

然后你需要一个接受字符串并将其转换为 a 的方法DataRow

DataRow ParseDataRow(string input)
{
    // Your parsing logic here
    ...
}

使用这些元素,您可以轻松地将文件中的每一行投影到 DataRow,并使用它做任何您需要的事情:

using (var reader = new StreamReader(fileName))
{
    var rows = reader.Lines().Select(ParseDataRow);
    foreach(DataRow row in rows)
    {
        // Do something with the DataRow
    }
}

(请注意,您可以在不使用 Linq 的情况下通过简单循环执行类似操作,但我认为 Linq 使代码更具可读性......)

于 2010-09-28T21:28:02.890 回答
2

SqlBulkCopy.WriteToServer 具有接受 IDataReader 的重载。您可以将自己的 IDataReader 实现为 StreamReader 的包装器,其中 Read() 方法将使用 StreamReader 中的一行。这样,数据将被“流式传输”到数据库中,而不是首先尝试在内存中将其构建为 DataTable。希望有帮助。

于 2010-09-28T21:21:03.567 回答
0

作为此处其他答案的更新,我也在研究这个问题,并且遇到了这个页面,它提供了一个很好的 C# 示例,用于按块读取文本文件,并行处理,然后批量插入数据库

代码的症结在这个循环中:

//Of note: it's faster to read all the lines we are going to act on and 
            //then process them in parallel instead of reading and processing line by line.
            //Code source: http://cc.davelozinski.com/code/c-sharp-code/read-lines-in-batches-process-in-parallel
            while (blnFileHasMoreLines)
            {
                batchStartTime = DateTime.Now;  //Reset the timer

                //Read in all the lines up to the BatchCopy size or
                //until there's no more lines in the file
                while (intLineReadCounter < BatchSize && !tfp.EndOfData)
                {
                    CurrentLines[intLineReadCounter] = tfp.ReadFields();
                    intLineReadCounter += 1;
                    BatchCount += 1;
                    RecordCount += 1;
                }

                batchEndTime = DateTime.Now;    //record the end time of the current batch
                batchTimeSpan = batchEndTime - batchStartTime;  //get the timespan for stats

                //Now process each line in parallel.
                Parallel.For(0, intLineReadCounter, x =>
                //for (int x=0; x < intLineReadCounter; x++)    //Or the slower single threaded version for debugging
                {
                    List<object> values = null; //so each thread gets its own copy. 

                    if (tfp.TextFieldType == FieldType.Delimited)
                    {
                        if (CurrentLines[x].Length != CurrentRecords.Columns.Count)
                        {
                            //Do what you need to if the number of columns in the current line
                            //don't match the number of expected columns
                            return; //stop now and don't add this record to the current collection of valid records.
                        }

                        //Number of columns match so copy over the values into the datatable
                        //for later upload into a database
                        values = new List<object>(CurrentRecords.Columns.Count);
                        for (int i = 0; i < CurrentLines[x].Length; i++)
                            values.Add(CurrentLines[x][i].ToString());

                        //OR do your own custom processing here if not using a database.
                    }
                    else if (tfp.TextFieldType == FieldType.FixedWidth)
                    {
                        //Implement your own processing if the file columns are fixed width.
                    }

                    //Now lock the data table before saving the results so there's no thread bashing on the datatable
                    lock (oSyncLock)
                    {
                        CurrentRecords.LoadDataRow(values.ToArray(), true);
                    }

                    values.Clear();

                }
                ); //Parallel.For   

                //If you're not using a database, you obviously won't need this next piece of code.
                if (BatchCount >= BatchSize)
                {   //Do the SQL bulk copy and save the info into the database
                    sbc.BatchSize = CurrentRecords.Rows.Count;
                    sbc.WriteToServer(CurrentRecords);

                    BatchCount = 0;         //Reset these values
                    CurrentRecords.Clear(); //  "
                }

                if (CurrentLines[intLineReadCounter] == null)
                    blnFileHasMoreLines = false;    //we're all done, so signal while loop to stop

                intLineReadCounter = 0; //reset for next pass
                Array.Clear(CurrentLines, 0, CurrentLines.Length);

            } //while blnhasmorelines
于 2016-11-09T04:05:36.993 回答