1

我正在尝试使用 XML 文件和 SqlBulkCopy 插入数据。目标表是一个时间序列表,设置如下

create table TimeSeries (
    Id          uniqueidentifier constraint DF_TimeSeries_Id default (newid()) not null,
    ObjectId    uniqueidentifier not null,
    [Date]      datetime not null,
    Value       float(53) not null,
    [Type]      varchar (4) not null,
    [Source]    varchar (4) not null,
    LastUpdate  datetime constraint DF_TimeSeries_LastUpdate default (getdate()) not null,
    TypeIndex   smallint constraint DF_TimeSeries_TypeIndex default (0) not null,
    constraint PK_TimeSeries primary key clustered ([Date] asc, ObjectId asc, [Type] asc, [Source] asc, TypeIndex asc) with (fillfactor = 80)
);

go
create nonclustered index [IX_TimeSeries_ObjectId_Type_Date_Source]
    on TimeSeries(ObjectId asc, [Type] asc, [Date] asc, [Source] asc)
    include(Value) with (fillfactor = 80);


go
create nonclustered index [IX_TimeSeries_ObjectId_Date]
    on TimeSeries(ObjectId asc, [Date] asc)
    include(Value) with (fillfactor = 80);

go
create table Beacons
(
    BeaconId uniqueidentifier not null default newid(), 
    [Description] varchar(50) not null, 
    LocationX float not null,
    LocationY float not null,
    Altitude float not null
    constraint PK_Beacons primary key clustered (BeaconId)
)
go
create index IX_Beacons on Beacons (BeaconId)

go
create table SnowGauges
(
    SnowGaugeId uniqueidentifier not null default newid(), 
    [Description] varchar(50) not null
    constraint PK_SnowGauges primary key clustered (SnowGaugeId)
)
go
create index IX_SnowGauges on SnowGauges (SnowGaugeId)


go
insert into Beacons ([Description], LocationX, LocationY, Altitude)
values ('Dunkery', 51.162, -3.586, 519), ('Prestwich', 53.527, -2.279, 76)
insert into SnowGauges ([Description]) values ('Val d''Isère')

select * from Beacons
select * from SnowGauges

如您所见,我想在 TimeSeries 中存储任何类型的时间序列。这可以是温度、压力、生物数据等……无论如何,我可以通过唯一标识符、来源和类型来识别时间序列。ObjectId 中没有设置外键,因为这个唯一标识符可以引用任何表。

在这个脚本的最后,我插入了 2 个信标和一个雪规,我想填充它们的时间序列。执行此操作的 XML 文件具有以下格式:

<?xml version="1.0" encoding="utf-8" ?>
<TimeSeries>
<TimeSeries ObjectId="186CA33E-AC1C-4220-81DE-C7CD32F40C1A" Date="09/06/2013 07:00:00" Value="9.2" Source = "Met Office" Type = "Temperature"/>
<TimeSeries ObjectId="186CA33E-AC1C-4220-81DE-C7CD32F40C1A" Date="09/06/2013 10:00:00" Value="8.8" Source = "Met Office" Type = "Temperature"/>
<TimeSeries ObjectId="186CA33E-AC1C-4220-81DE-C7CD32F40C1A" Date="09/06/2013 13:00:00" Value="8.7" Source = "Met Office" Type = "Temperature"/>
<TimeSeries ObjectId="186CA33E-AC1C-4220-81DE-C7CD32F40C1A" Date="09/06/2013 07:00:00" Value="1" Source = "Met Office" Type = "UV"/>
<TimeSeries ObjectId="186CA33E-AC1C-4220-81DE-C7CD32F40C1A" Date="09/06/2013 10:00:00" Value="3" Source = "Met Office" Type = "UV"/>
<TimeSeries ObjectId="186CA33E-AC1C-4220-81DE-C7CD32F40C1A" Date="09/06/2013 13:00:00" Value="5" Source = "Met Office" Type = "UV"/>
<TimeSeries ObjectId="AFB81E51-18B0-4696-9C2F-E6E9EEC1B647" Date="09/06/2013 07:00:00" Value="5.8" Source = "Met Office" Type = "Temperature"/>
<TimeSeries ObjectId="AFB81E51-18B0-4696-9C2F-E6E9EEC1B647" Date="09/06/2013 10:00:00" Value="6.3" Source = "Met Office" Type = "Temperature"/>
<TimeSeries ObjectId="AFB81E51-18B0-4696-9C2F-E6E9EEC1B647" Date="09/06/2013 13:00:00" Value="6.5" Source = "Met Office" Type = "Temperature"/>
<TimeSeries ObjectId="50E52A2B-D719-4341-A451-110D0874D26D" Date="07/06/2013 00:00:00" Value="80.5" Source = "Meteo France" Type = "SnowMeter"/>
<TimeSeries ObjectId="50E52A2B-D719-4341-A451-110D0874D26D" Date="08/06/2013 00:00:00" Value="80.5" Source = "Meteo France" Type = "SnowMeter"/>
</TimeSeries>

如果您运行第一个脚本,您可能会有不同的 ObjectId,并且必须在 XML 文件中更新它们。所以从那里开始,一切都应该是直截了当的,一个简单的 C# 程序应该可以完成插入数据的工作。现在让我们看一下 C# 代码:

using System;
using System.Data;
using System.Data.SqlClient;
using System.IO;

namespace XMLBulkInsert
{
    class Program
    {
        const string XMLFILE_PATH = @"C:\Workspaces\Ws1\R\TimeSeries\TimeSeries.xml";
        const string CONNECTION_STRING = @"Server=RISK1;Database=DevStat;Trusted_Connection=True;";

        static void Main(string[] args)
        {
            StreamReader xmlFile = new StreamReader(XMLFILE_PATH);
            DataSet ds = new DataSet();

            Console.Write("Read file... ");
            ds.ReadXml(xmlFile);
            DataTable sourceData = ds.Tables[0];
            Console.WriteLine("Done !");

            using (SqlConnection sourceConnection = new SqlConnection(CONNECTION_STRING))
            {
                sourceConnection.Open();
                using (SqlBulkCopy bulkCopy = new SqlBulkCopy(sourceConnection.ConnectionString))
                {
                    bulkCopy.ColumnMappings.Add("ObjectId", "ObjectId");
                    bulkCopy.ColumnMappings.Add("Date", "Date");
                    bulkCopy.ColumnMappings.Add("Value", "Value");
                    bulkCopy.ColumnMappings.Add("Source", "Source");
                    bulkCopy.ColumnMappings.Add("Type", "Type");
                    bulkCopy.DestinationTableName = "TimeSeries";

                    try
                    {
                        Console.Write("Insert data... ");
                        bulkCopy.WriteToServer(sourceData);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }
                    finally
                    {
                        xmlFile.Close();
                        sourceConnection.Close();
                    }
                }
            }

            Console.WriteLine("Insertion completed, please Press Enter...");
            Console.ReadLine();
        }

    }
}

运行此程序返回此异常:“来自数据源的 String 类型的给定值无法转换为指定目标列的类型 uniqueidentifier。”。当我设置映射以强制列成为唯一标识符时,似乎没有办法。我什至尝试插入此代码ds.Tables[0].Columns[0].DataType = typeof(Guid);但没有成功,一旦表中有行数据,.Net 就无法更改列的类型。

我对 SQlBulkCopy 有很高的例外,但现在我觉得有点卡住了。我有数以百万计的 XML 格式的数据,由于这个唯一标识符,我无法插入其中的任何一个。

有谁知道如何设置此类以接受唯一标识符?

4

1 回答 1

2

鉴于大约3 亿行的评论,我会忘记DataTable; 您不想一次加载所有数据。理想的做法是逐个元素解析它,将数据公开为IDataReader.

幸运的是,存在一些用于此的实用程序。首先,让我们解析出数据。每一行本质上是:

class TimeSeries
{
    public Guid ObjectId { get; set; }
    public DateTime Date { get; set; }
    public string Source { get; set; }
    public string Type { get; set; }
    public decimal Value { get; set; }
}

我们可以编写一个基于元素的阅读器,例如:

static IEnumerable<TimeSeries> ReadTimeSeries(TextReader source)
{
    using (var reader = XmlReader.Create(source, new XmlReaderSettings {
                     IgnoreWhitespace = true }))
    {
        reader.MoveToContent();
        reader.ReadStartElement("TimeSeries");
        while(reader.Read() && reader.NodeType == XmlNodeType.Element
                    && reader.Depth == 1)
        {
            using (var subtree = reader.ReadSubtree())
            {
                var el = XElement.Load(subtree);
                var obj = new TimeSeries
                {
                    ObjectId = (Guid) el.Attribute("ObjectId"),
                    // note: datetime is not xml format; need to parse - this
                    // should probably be more explicit
                    Date = DateTime.Parse((string) el.Attribute("Date")),
                    Source = (string) el.Attribute("Source"),
                    Type = (string)el.Attribute("Type"),
                    Value = (decimal)el.Attribute("Value")
                };
                yield return obj;
            }
        }
    }
}

请注意,这是一个“迭代器块”,并且是延迟假脱机 - 它不会一次加载所有数据。

接下来,我们需要一个可以使用IEnumerable<T>并公开它的 API IDataReader—— FastMember正是这样做的(以及许多其他事情)。所以我们可以这样写:

using(var bcp = new SqlBulkCopy(connection))
using(var objectReader = ObjectReader.Create(ReadTimeSeries(source)))
{
    bcp.DestinationTableName = "SomeTable";
    bcp.WriteToServer(objectReader);
}

sourcea在哪里TextReader,例如来自File.OpenText

using(var source = File.OpenText(path))
using(var bcp = new SqlBulkCopy(connection))
using(var objectReader = ObjectReader.Create(ReadTimeSeries(source)))
{
    bcp.DestinationTableName = "SomeTable";
    bcp.WriteToServer(objectReader);
}

如果要控制列顺序,可以使用bcp.ColumnMappings- 但也许更方便的是在IDataReader内部进行:

using(var objectReader = ObjectReader.Create(
    ReadTimeSeries(source, "ObjectId", "Date", "Value" /* etc */)))
{
    bcp.DestinationTableName = "SomeTable";
    bcp.WriteToServer(objectReader);
}

我将它用于我自己的一些代码 - 即使数据确实适合内存,它也通过DataTable.

不过,关键是我们现在可以控制正在发生的事情。

于 2013-06-10T14:29:37.507 回答