3

我无法使用 C# 从 SQL Server 获取超过 700000 行 - 我收到“内存不足”异常。请帮帮我。

这是我的代码:

using (SqlConnection sourceConnection = new SqlConnection(constr))
{
    sourceConnection.Open();

    SqlCommand commandSourceData = new SqlCommand("select * from XXXX ", sourceConnection);

    reader = commandSourceData.ExecuteReader();
}

using (SqlBulkCopy bulkCopy = new SqlBulkCopy(constr2))
{
    bulkCopy.DestinationTableName = "destinationTable";

    try
    {
        // Write from the source to the destination.
        bulkCopy.WriteToServer(reader);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
    finally
    {
        reader.Close();
    }
}

我已经根据给定的解决方案 1 制作了小型控制台应用程序,但最终出现了同样的异常,我在 处理之前和之后发布了我的内存过程:在此处输入图像描述

处理后

在读码端添加命令超时后,Ram Peaks up, 在此处输入图像描述

4

3 回答 3

3

该代码不应导致 OOM 异常。当您将 DataReader 传递给 SqlBulkCopy.WriteToServer 时,您正在将行从源流式传输到目标。在其他地方,您将内容保留在内存中。

SqlBulkCopy.BatchSize控制 SQL Server 提交在目标加载的行的频率,限制锁定持续时间和日志文件增长(如果不是最低限度记录和简单恢复模式)。无论您是否使用一批,都不会影响 SQL Server 或客户端中使用的内存量。

这是一个在不增加内存的情况下复制 10M 行的示例:

using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SqlBulkCopyTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var src = "server=localhost;database=tempdb;integrated security=true";
            var dest = src;

            var sql = "select top (1000*1000*10) m.* from sys.messages m, sys.messages m2";

            var destTable = "dest";

            using (var con = new SqlConnection(dest))
            {
                con.Open();
                var cmd = con.CreateCommand();
                cmd.CommandText = $"drop table if exists {destTable}; with q as ({sql}) select * into {destTable} from q where 1=2";
                cmd.ExecuteNonQuery();
            }

            Copy(src, dest, sql, destTable);
            Console.WriteLine("Complete.  Hit any key to exit.");
            Console.ReadKey();
        }

        static void Copy(string sourceConnectionString, string destinationConnectionString, string query, string destinationTable)
        {
            using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
            {
                sourceConnection.Open();

                SqlCommand commandSourceData = new SqlCommand(query, sourceConnection);

                var reader = commandSourceData.ExecuteReader();

                using (SqlBulkCopy bulkCopy = new SqlBulkCopy(destinationConnectionString))
                {
                    bulkCopy.BulkCopyTimeout = 60 * 10;
                    bulkCopy.DestinationTableName = destinationTable;
                    bulkCopy.NotifyAfter = 10000;
                    bulkCopy.SqlRowsCopied += (s, a) =>
                    {
                        var mem = GC.GetTotalMemory(false);
                        Console.WriteLine($"{a.RowsCopied:N0} rows copied.  Memory {mem:N0}");
                    };
                     // Write from the source to the destination.
                     bulkCopy.WriteToServer(reader);

                }
            }
        }


    }
}

哪个输出:

. . .
9,830,000 rows copied.  Memory 1,756,828
9,840,000 rows copied.  Memory 798,364
9,850,000 rows copied.  Memory 4,042,396
9,860,000 rows copied.  Memory 3,092,124
9,870,000 rows copied.  Memory 2,133,660
9,880,000 rows copied.  Memory 1,183,388
9,890,000 rows copied.  Memory 3,673,756
9,900,000 rows copied.  Memory 1,601,044
9,910,000 rows copied.  Memory 3,722,772
9,920,000 rows copied.  Memory 1,642,052
9,930,000 rows copied.  Memory 3,763,780
9,940,000 rows copied.  Memory 1,691,204
9,950,000 rows copied.  Memory 3,812,932
9,960,000 rows copied.  Memory 1,740,356
9,970,000 rows copied.  Memory 3,862,084
9,980,000 rows copied.  Memory 1,789,508
9,990,000 rows copied.  Memory 3,903,044
10,000,000 rows copied.  Memory 1,830,468
Complete.  Hit any key to exit.
于 2018-11-26T16:53:02.080 回答
0

注意:根据DavidBrowne 的回答,我似乎误解了 SqlBulkCopy 类的批处理是如何工作的。重构的代码可能对您仍然有用,所以我没有删除这个答案(因为代码仍然有效),但答案不是像我相信的那样设置 BatchSize。请参阅大卫的答案以获得解释。


尝试这样的事情;关键是设置BatchSize属性以限制一次处理的行数:

using (SqlConnection sourceConnection = new SqlConnection(constr))
{
    sourceConnection.Open();
    SqlCommand commandSourceData = new SqlCommand("select * from XXXX ", sourceConnection);
    using (reader = commandSourceData.ExecuteReader() { //add a using statement for your reader so you don't need to worry about close/dispose

        //keep the connection open or we'll be trying to read from a closed connection

        using (SqlBulkCopy bulkCopy = new SqlBulkCopy(constr2))
        {
            bulkCopy.BatchSize = 1000; //Write a few pages at a time rather than all at once; thus lowering memory impact.  See https://docs.microsoft.com/en-us/dotnet/api/system.data.sqlclient.sqlbulkcopy.batchsize?view=netframework-4.7.2
            bulkCopy.DestinationTableName = "destinationTable";

            try
            {
                // Write from the source to the destination.
                bulkCopy.WriteToServer(reader);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                throw; //we've caught the top level Exception rather than somethign specific; so once we've logged it, rethrow it for a proper handler to deal with up the call stack
            }
        }
    }

}

请注意,由于SqlBulkCopy该类将 aIDataReader作为参数,我们不需要下载完整的数据集。相反,阅读器为我们提供了一种根据需要拉回记录的方法(因此我们在创建阅读器后保持连接打开)。当我们调用SqlBulkCopy'sWriteToServer方法时,它在内部具有循环多次的逻辑,BatchSize从读取器中选择新记录,然后在读取器发送完所有待处理记录后重复/完成之前将它们推送到目标表。这与 a 的工作方式不同,DataTable我们必须用完整的记录集填充数据表,而不是能够根据需要读取更多内容。

这种方法的一个潜在风险是,因为我们必须保持连接打开,所以在我们关闭阅读器之前,我们的源上的任何锁定都会保留。根据隔离级别以及其他查询是否尝试访问相同的记录,这可能会导致阻塞;而数据表方法将一次性将数据复制到内存中,然后关闭连接,避免任何阻塞。如果此阻塞是一个问题,您应该考虑更改查询的隔离级别,或应用提示......不过,您如何处理这将取决于要求。

注意:实际上,您不想按原样运行上述代码,而是要稍微重构一下,因此每个方法的范围都包含在内。这样,您可以重用此逻辑将其他查询复制到其他表。您还希望使批量大小可配置而不是硬编码,以便您可以调整到一个值,该值可以很好地平衡资源使用与性能(这将根据主机的资源而有所不同)。
您可能还想使用async方法来允许程序的其他部分在您等待数据从/流入数据库时​​继续进行。

这是一个稍微修改的版本:

public Task<SqlDataReader> async ExecuteReaderAsync(string connectionString, string query) 
{
    SqlConnection connection;
    SqlCommand command; 
    try 
    {
        connection = new SqlConnection(connectionString); //not in a using as we want to keep the connection open until our reader's finished with it.
        connection.Open();
        command = new SqlCommand(query, connection);
        return await command.ExecuteReaderAsync(CommandBehavior.CloseConnection);  //tell our reader to close the connection when done.
    } 
    catch 
    {
        //if we have an issue before we've returned our reader, dispose of our objects here
        command?.Dispose();
        connection?.Dispose();
        //then rethrow the exception
        throw;
    }
}
public async Task CopySqlDataAsync(string sourceConnectionString, string sourceQuery, string destinationConnectionString, string destinationTableName, int batchSize)
{
    using (var reader = await ExecuteReaderAsync(sourceConnectionString, sourceQuery))
        await CopySqlDataAsync(reader, destinationConnectionString, destinationTableName, batchSize);
}
public async Task CopySqlDataAsync(IDataReader sourceReader, string destinationConnectionString, string destinationTableName, int batchSize)
{
    using (SqlBulkCopy bulkCopy = new SqlBulkCopy(destinationConnectionString))
    {
        bulkCopy.BatchSize = batchSize; 
        bulkCopy.DestinationTableName = destinationTableName;
        await bulkCopy.WriteToServerAsync(sourceReader);
    }
}
public void CopySqlDataExample()
{
    try 
    {
        var constr = ""; //todo: define connection string; ideally pulling from config 
        var constr2 = ""; //todo: define connection string #2; ideally pulling from config 
        var batchSize = 1000; //todo: replace hardcoded batch size with value from config
        var task = CopySqlDataAsync(constr, "select * from XXXX", constr2, "destinationTable", batchSize); 
        task.Wait(); //waits for the current task to complete / if any exceptions will throw an aggregate exception
    } 
    catch (AggregateException es)
    {
        var e = es.InnerExceptions[0]; //get the wrapped exception 
        Console.WriteLine(e.Message);
        //throw; //to rethrow AggregateException 
        ExceptionDispatchInfo.Capture(e).Throw(); //to rethrow the wrapped exception
    }
}
于 2018-11-26T14:10:21.693 回答
0

如果您甚至尝试在 C# 中处理 700k 行,那么您的设计就会出现严重错误。你在这方面失败是意料之中的。

如果这是用于显示的数据检索:用户无法处理这么多数据。从 GUI 中的 700k 行过滤只是浪费时间和带宽。一次 25-100 个字段大约是极限。在查询端进行过滤或分页,这样您最终检索的数量级不会超过实际处理的数量。

如果这是某种形式的批量插入或批量修改:在 SQL Server 中执行这种操作,而不是在您的代码中。在 C# 中检索、处理然后回发只会增加开销层。如果您添加 2 路网络传输,您将轻松将所需时间增加三倍。

于 2018-11-26T13:25:21.897 回答