79

我必须从文本文件中插入大约 200 万行。

通过插入,我必须创建一些主表。

将如此庞大的数据集插入 SQL Server 的最佳且快速的方法是什么?

4

7 回答 7

73
  1. 我认为最好在 DataSet 中读取文本文件的数据

  2. 试用 SqlBulkCopy -从 C# 应用程序批量插入 SQL

    // connect to SQL
    using (SqlConnection connection = new SqlConnection(connString))
    {
        // make sure to enable triggers
        // more on triggers in next post
        SqlBulkCopy bulkCopy = new SqlBulkCopy(
            connection, 
            SqlBulkCopyOptions.TableLock | 
            SqlBulkCopyOptions.FireTriggers | 
            SqlBulkCopyOptions.UseInternalTransaction,
            null
            );
    
        // set the destination table name
        bulkCopy.DestinationTableName = this.tableName;
        connection.Open();
    
        // write the data in the "dataTable"
        bulkCopy.WriteToServer(dataTable);
        connection.Close();
    }
    // reset
    this.dataTable.Clear();
    

或者

在顶部执行第 1 步后

  1. 从数据集创建 XML
  2. 将 XML 传递到数据库并进行批量插入

您可以查看这篇文章以了解详细信息:使用 C# DataTable 和 SQL 服务器 OpenXML 函数批量插入数据

但它没有用 200 万条记录进行测试,它只会消耗机器上的内存,因为你必须加载 200 万条记录并插入它。

于 2012-12-05T11:18:12.367 回答
61

你可以试试SqlBulkCopy上课。

让您可以使用来自其他源的数据有效地批量加载 SQL Server 表。

有一篇很酷的博客文章介绍了如何使用它。

于 2012-12-05T11:23:35.903 回答
29

重新解决 SqlBulkCopy 的问题:

我使用 StreamReader 来转换和处理文本文件。结果是我的对象列表。

我创建了一个类而不是Datatable一个或List<T>一个缓冲区大小(CommitBatchSize)。它将使用扩展名(在第二类中)将列表转换为数据表。

它工作得非常快。在我的 PC 上,我能够在 10 秒内插入超过 1000 万条复杂记录。

这是课程:

using System;
using System.Collections;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DAL
{

public class BulkUploadToSql<T>
{
    public IList<T> InternalStore { get; set; }
    public string TableName { get; set; }
    public int CommitBatchSize { get; set; }=1000;
    public string ConnectionString { get; set; }

    public void Commit()
    {
        if (InternalStore.Count>0)
        {
            DataTable dt;
            int numberOfPages = (InternalStore.Count / CommitBatchSize)  + (InternalStore.Count % CommitBatchSize == 0 ? 0 : 1);
            for (int pageIndex = 0; pageIndex < numberOfPages; pageIndex++)
                {
                    dt= InternalStore.Skip(pageIndex * CommitBatchSize).Take(CommitBatchSize).ToDataTable();
                BulkInsert(dt);
                }
        } 
    }

    public void BulkInsert(DataTable dt)
    {
        using (SqlConnection connection = new SqlConnection(ConnectionString))
        {
            // make sure to enable triggers
            // more on triggers in next post
            SqlBulkCopy bulkCopy =
                new SqlBulkCopy
                (
                connection,
                SqlBulkCopyOptions.TableLock |
                SqlBulkCopyOptions.FireTriggers |
                SqlBulkCopyOptions.UseInternalTransaction,
                null
                );

            // set the destination table name
            bulkCopy.DestinationTableName = TableName;
            connection.Open();

            // write the data in the "dataTable"
            bulkCopy.WriteToServer(dt);
            connection.Close();
        }
        // reset
        //this.dataTable.Clear();
    }

}

public static class BulkUploadToSqlHelper
{
    public static DataTable ToDataTable<T>(this IEnumerable<T> data)
    {
        PropertyDescriptorCollection properties =
            TypeDescriptor.GetProperties(typeof(T));
        DataTable table = new DataTable();
        foreach (PropertyDescriptor prop in properties)
            table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
        foreach (T item in data)
        {
            DataRow row = table.NewRow();
            foreach (PropertyDescriptor prop in properties)
                row[prop.Name] = prop.GetValue(item) ?? DBNull.Value;
            table.Rows.Add(row);
        }
        return table;
    }
}

}

这是我想插入自定义对象列表List<PuckDetection>( ListDetections) 的示例:

var objBulk = new BulkUploadToSql<PuckDetection>()
{
        InternalStore = ListDetections,
        TableName= "PuckDetections",
        CommitBatchSize=1000,
        ConnectionString="ENTER YOU CONNECTION STRING"
};
objBulk.Commit();

BulkInsert如果需要,可以修改该类以添加列映射。例如,您有一个 Identity 键作为第一列。(假设数据表中的列名与数据库相同)

//ADD COLUMN MAPPING
foreach (DataColumn col in dt.Columns)
{
        bulkCopy.ColumnMappings.Add(col.ColumnName, col.ColumnName);
}
于 2016-10-20T13:53:21.443 回答
6

我使用 bcp 实用程序。(批量复制程序)我每月加载大约 150 万条文本记录。每个文本记录的宽度为 800 个字符。在我的服务器上,将 150 万条文本记录添加到 SQL Server 表中大约需要 30 秒。

bcp 的说明位于http://msdn.microsoft.com/en-us/library/ms162802.aspx

于 2013-04-05T15:12:23.020 回答
3

我最近遇到了这种情况(超过 700 万行),并通过 powershell 使用 sqlcmd(在将原始数据解析为 SQL 插入语句之后)一次 5,000 段(SQL 不能一次处理 700 万行)甚至 500,000 行,除非它分解成更小的 5K 片段。然后您可以一个接一个地运行每个 5K 脚本。)因为我需要利用 SQL Server 2012 Enterprise 中的新序列命令。我找不到使用所述序列命令快速有效地插入 700 万行数据的编程方式。

其次,一次插入一百万行或更多数据时要注意的一件事是插入过程中的 CPU 和内存消耗(主要是内存)。SQL 将在不释放上述进程的情况下完成如此规模的工作占用内存/CPU。不用说,如果您的服务器上没有足够的处理能力或内存,您可以很容易地在短时间内使它崩溃(我发现很难)。如果你的内存消耗超过 70-75%,只需重新启动服务器,进程就会恢复正常。

在我真正制定最终执行计划之前,我必须运行一堆试错测试来查看我的服务器的限制是什么(考虑到可使用的 CPU/内存资源有限)。我建议您在将其投入生产之前在测试环境中执行相同的操作。

于 2012-12-05T16:25:34.093 回答
2

我尝试了这种方法,它显着减少了我的数据库插入执行时间。

List<string> toinsert = new List<string>();
StringBuilder insertCmd = new StringBuilder("INSERT INTO tabblename (col1, col2, col3) VALUES ");

foreach (var row in rows)
{
      // the point here is to keep values quoted and avoid SQL injection
      var first = row.First.Replace("'", "''")
      var second = row.Second.Replace("'", "''")
      var third = row.Third.Replace("'", "''")

      toinsert.Add(string.Format("( '{0}', '{1}', '{2}' )", first, second, third));
}
if (toinsert.Count != 0)
{
      insertCmd.Append(string.Join(",", toinsert));
      insertCmd.Append(";");
}
using (MySqlCommand myCmd = new MySqlCommand(insertCmd.ToString(), SQLconnectionObject))
{
      myCmd.CommandType = CommandType.Text;
      myCmd.ExecuteNonQuery();
}

*创建 SQL 连接对象并将其替换为我编写 SQLconnectionObject 的位置。

于 2018-12-25T06:22:58.510 回答
0

我遇到了一个应该与 ADO、Entity 和 Dapper 一起使用的解决方案的问题,所以制作了这个库;它以以下形式生成批次:

    IEnumerable<(string SqlQuery, IEnumerable<SqlParameter> SqlParameters)>  
    IEnumerable<(string SqlQuery, DynamicParameters DapperDynamicParameters)> 

此链接包含说明。它对 SQL 注入是安全的,因为使用参数而不是连接;如果需要,您也可以通过可选参数将标识插入设置为 ON。

与 ADO.NET 一起使用:

using MsSqlHelpers;
// ...
var mapper = new MapperBuilder<Person>()
    .SetTableName("People")
    .AddMapping(person => person.FirstName, columnName: "Name")
    .AddMapping(person => person.LastName, columnName: "Surename")
    .AddMapping(person => person.DateOfBirth, columnName: "Birthday")
    .Build();
var people = new List<Person>()
{ 
    new Person()
    {
        FirstName = "John", 
        LastName = "Lennon", 
        DateOfBirth = new DateTime(1940, 10, 9) 
    },
    new Person()
    {
        FirstName = "Paul", 
        LastName = "McCartney", 
        DateOfBirth = new DateTime(1942, 6, 18) 
    },
};
var connectionString = "Server=SERVER_ADDRESS;Database=DATABASE_NAME;User Id=USERNAME;Password=PASSWORD;";
var sqlQueriesAndParameters = new MsSqlQueryGenerator()
    .GenerateParametrizedBulkInserts(mapper, people);

using (var sqlConnection = new SqlConnection(connectionString))
{
    sqlConnection.Open();
    
    // Default batch size: 1000 rows or (2100-1) parameters per insert.
    foreach (var (SqlQuery, SqlParameters) in sqlQueriesAndParameters)
    {
        using (SqlCommand sqlCommand = new SqlCommand(SqlQuery, sqlConnection))
        {
            sqlCommand.Parameters.AddRange(SqlParameters.ToArray());
            sqlCommand.ExecuteNonQuery();
        }
    }
}

与 Dapper 一起使用:

using MsSqlHelpers;
// ...
var mapper = new MapperBuilder<Person>()
    .SetTableName("People")
    .AddMapping(person => person.FirstName, columnName: "Name")
    .AddMapping(person => person.LastName, columnName: "Surename")
    .AddMapping(person => person.DateOfBirth, columnName: "Birthday")
    .Build();
var people = new List<Person>()
{ 
    new Person()
    {
        FirstName = "John", 
        LastName = "Lennon", 
        DateOfBirth = new DateTime(1940, 10, 9) 
    },
    new Person()
    { 
        FirstName = "Paul", 
        LastName = "McCartney", 
        DateOfBirth = new DateTime(1942, 6, 18) 
    },
};
var connectionString = "Server=SERVER_ADDRESS;Database=DATABASE_NAME;User Id=USERNAME;Password=PASSWORD;";
var sqlQueriesAndDapperParameters = new MsSqlQueryGenerator()
    .GenerateDapperParametrizedBulkInserts(mapper, people);

using (var sqlConnection = new SqlConnection(connectionString))
{
    // Default batch size: 1000 rows or (2100-1) parameters per insert.
    foreach (var (SqlQuery, DapperDynamicParameters) in sqlQueriesAndDapperParameters)
    {
        sqlConnection.Execute(SqlQuery, DapperDynamicParameters);
    }
}

与实体框架一起使用:

using MsSqlHelpers;
// ...
var mapper = new MapperBuilder<Person>()
    .SetTableName("People")
    .AddMapping(person => person.FirstName, columnName: "Name")
    .AddMapping(person => person.LastName, columnName: "Surename")
    .AddMapping(person => person.DateOfBirth, columnName: "Birthday")
    .Build();
var people = new List<Person>()
{ 
    new Person() 
    { 
        FirstName = "John", 
        LastName = "Lennon", 
        DateOfBirth = new DateTime(1940, 10, 9) 
    },
    new Person()
    { 
        FirstName = "Paul", 
        LastName = "McCartney", 
        DateOfBirth = new DateTime(1942, 6, 18) 
    },
};
var sqlQueriesAndParameters = new MsSqlQueryGenerator()
    .GenerateParametrizedBulkInserts(mapper, people);

// Default batch size: 1000 rows or (2100-1) parameters per insert.
foreach (var (SqlQuery, SqlParameters) in sqlQueriesAndParameters)
{
    _context.Database.ExecuteSqlRaw(SqlQuery, SqlParameters);
    // Depracated but still works: _context.Database.ExecuteSqlCommand(SqlQuery, SqlParameters);
}
于 2021-04-10T19:44:02.320 回答