2

我遇到了一个超出我的谷歌搜索能力的问题,如果有任何帮助,我将不胜感激。

我创建了一个非常简单的 SSIS 包,用于使用 OLEDB 连接将数据从 Azure SQL 数据库中的源表复制到 MS SQL 数据库中的目标。目标表是从源数据库生成的脚本创建的(在 SSMS 中,我右键单击数据库名称 -> 任务 -> 生成脚本),因此它们应该是相同的。SSIS 包不进行任何转换——它是对目标表的简单截断,然后是具有连接到 OleDBDestination 的 OleDBSource 的数据流任务。

但是,我在执行包时收到以下错误:

Hresult: 0x80004005  Description: "Violation of PRIMARY KEY constraint 'PK_HashKey'. Cannot insert
duplicate key in object 'TargetTable'. The duplicate key value is (                          ).".

所讨论的主键列的数据类型为 char(32),并使用 HASHBYTES 函数使用“SHA2_256”算法生成。错误消息给出了一个错误的主键示例,基本上是一系列空格。当我查看 Source 表时,看起来确实有多行,其中主键只是一系列空格。但我猜 Azure SQL 可以以某种方式区分它们,因为 Source 表上没有主键问题。只有当我尝试将数据复制到本地 MS SQL Server 数据库时,我才会遇到主键违规。

我尝试过的事情:

  1. 检查源和目标表列、数据库和服务器上的排序规则设置是否相同。

  2. 检查 SSIS 数据流源和目标的高级编辑器以确保代码页相同。

  3. 删除目标表上的主键约束,然后使用 SSIS 移动数据,然后运行查询以比较 SSIS 传输的哈希值与完全生成新哈希值。对于大约 8% 的表,SQL Server 认为传输的散列与新散列不匹配,尽管散列通常看起来相似。这是查询:

SELECT CONVERT(CHAR(32), HASHBYTES('SHA2_256', BusinessKey)), Hashkey, BusinessKey 
FROM TargetTable 
WHERE CONVERT(CHAR(32), HASHBYTES('SHA2_256', BusinessKey)) <> HashKey

基于#3,我的猜测是发生以下情况之一:

  1. SSIS 在将数据从源复制到目标时以某种方式转换哈希。

  2. 目标列/表/数据库具有一些设置,使其存储 char(32) 数据的方式与源列/表/数据库不同。

  3. 某种错误。

任何人有任何经验可以帮助阐明这个问题吗?

4

1 回答 1

1

CONVERT(CHAR(32), HASHBYTES('SHA2_256', BusinessKey))

会引起问题。它采用 32 字节哈希并将其作为 32 个代码点存储在一些 varchar 排序规则中。许多代码点是非打印字符,因此检查值将一直是个问题。

而且我不会相信 SSIS 或任何其他外部程序来回甚至显示这些值。许多编程环境使用以空值结尾的字符串,其中字节 0x00 表示字符串的结尾。并且您的哈希将在大约 12% 的时间中具有 0x00 代码点。P=1-(255/256)^32

将哈希存储为 BINARY(32) 或 Base64 编码字符串,您的问题应该会消失。

这是从我正在处理的示例中提取的一个小实用程序,它可能对您有用。我在存储在具有唯一约束的 char(32) 中的 200k 行表上使用哈希对其进行了测试,并且效果很好。

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Text;

namespace Microsoft.Samples.SqlServerDataMover
{
    public class Program
    {

        public static void Main(string[] args)
        {
            var srcConnectionString = args[0];
            var destConnectionString = args[1];

            using (var src = new SqlConnection(srcConnectionString))
            using (var dest = new SqlConnection(destConnectionString))
            {
                src.Open();
                dest.Open();

                var cmdTables = src.CreateCommand();
                cmdTables.CommandText = "select schema_name(schema_id) schema_name, name table_name from sys.tables where type_desc = 'USER_TABLE'";

                var tables = new List<(string, string)>();
                using (var rdr = cmdTables.ExecuteReader())
                {
                    while (rdr.Read())
                    {
                        var schema = rdr.GetString(0);
                        var table = rdr.GetString(1);
                        tables.Add((schema, table));
                    }
                }

                foreach (var t in tables)
                {
                    CopyTable(src, dest, t.Item2, t.Item1);
                }

            }
        }
        static string QuoteName(string identifier)
        {
            var sb = new StringBuilder(identifier.Length + 3, 1024);
            sb.Append('[');
            foreach (var c in identifier)
            {
                if (c == ']')
                    sb.Append(']');
                sb.Append(c);
            }
            sb.Append(']');
            return sb.ToString();

        }

        public static void CopyTable(SqlConnection src, SqlConnection dest, string tableName, string schemaName, int batchSize = 10000)
        {
            var opts = SqlBulkCopyOptions.KeepIdentity | SqlBulkCopyOptions.KeepNulls | SqlBulkCopyOptions.TableLock;

            var sql = $"select * from {QuoteName(schemaName)}.{QuoteName(tableName)}";
            var cmd = src.CreateCommand();
            cmd.CommandTimeout = 0;
            cmd.CommandText = sql;

            using (var rdr = cmd.ExecuteReader())
            using (var bc = new SqlBulkCopy(dest, opts, null))
            {
                var schemaDt = rdr.GetSchemaTable();
                //schemaDt.WriteXml(Console.Out);

                var schema = new Dictionary<string, DataRow>();
                foreach (DataRow r in schemaDt.Rows)
                {
                    schema.Add(r[0].ToString(), r);
                }


                bc.BatchSize = batchSize;
                for (int i = 0; i < rdr.FieldCount; i++)
                {
                    var cn = rdr.GetName(i);
                    bool isreadonly = schema[cn].Field<bool>("IsReadOnly");

                    if (!isreadonly)
                    {
                        bc.ColumnMappings.Add(i, cn);
                    }

                }

                bc.NotifyAfter = 10000;

                bc.SqlRowsCopied += (s, a) =>
                {
                    Console.WriteLine($"[{schemaName}].[{tableName}] {a.RowsCopied} Rows Copied.");
                };

                bc.DestinationTableName = $"[{schemaName}].[{tableName}]";
                bc.WriteToServer(rdr);
                Console.WriteLine($"[{schemaName}].[{tableName}] Complete. {1} Rows Copied.");
            }

        }
    }
}

只需传入两个连接字符串即可运行它,首先是源,然后是目标。

例如

PS C:\temp\datamover> .\bin\debug\DataMover.exe "Server=xxxxxx.database.windows.net;database=adventureworks;User ID=xxxxxx;Password=xxxxxx" "Server=localhost;database=awcopy;integrated security=true"
于 2020-05-22T19:21:45.610 回答