最近,我的任务是创建一个自动化 ETL 流程,该流程通过读取主映射文件,根据平面文件名将数据泵入表中。我决定使用 SqlBulkCopy,一切似乎都很好。IDataReader 接口被实现来读取平面文件,SQL Server 的元数据提供了用于一对一数据映射的列数,一切正常,直到我遇到带有空字符串的文件。SqlBulkCopy 抛出一个异常,提示“来自数据源的 String 类型的给定值无法转换为指定目标列的 int 类型。”。故事结束,它甚至不关心该列的 DB 类型是 INT NULL。我知道我可以进一步解释元数据,提取给定列的数据类型,基于提取的信息构建数据集,从平面文件中重新转换数据,给自己一个很好的强类型解决方案,但我是一个懒惰的人,觉得他的幸福被微软恶毒地撕裂了,或者如果有人知道我的解决方案,我自己的无能突然的问题。感谢您的时间。
List<String> fileNames;
DateTime startJobTime = DateTime.Now;
Console.WriteLine("---------------------------------------------");
Console.WriteLine("Start Time: " + startJobTime);
Console.WriteLine("---------------------------------------------");
using (SqlConnection sqlCon = new SqlConnection(sqlConnection))
{
try
{
sqlCon.Open();
sqlCon.ChangeDatabase(edwDBName);
// Get service information for staging job
UnivStage us = GetStagingJobInfo(jobName, sqlCon);
us.StartJobTime = startJobTime;
// Get a list of file names
fileNames = GetFileList(us, args);
if (fileNames.Count > 0)
{
// Truncate Staging Table
TruncateStagingTable(us, sqlCon);
// Close and dispose of sqlCon2 connection
sqlCon.Close();
Console.WriteLine("Processing files: ");
foreach (String fileName in fileNames)
Console.WriteLine(fileName);
Console.WriteLine();
}
else
{
Console.WriteLine("No files to process.");
Environment.Exit(0);
}
// Re-open Sql Connection
sqlCon.Open();
sqlCon.ChangeDatabase(stagingDBName);
foreach (String filePath in fileNames)
{
using (SqlTransaction sqlTran = sqlCon.BeginTransaction())
{
using (FlatFileReader ffReader = new FlatFileReader(filePath, us.Delimiter))
{
using (SqlBulkCopy sqlBulkCopy =
new SqlBulkCopy(sqlCon, SqlBulkCopyOptions.Default, sqlTran))
{
SqlConnection sqlCon2 = new SqlConnection(sqlConnection);
SetColumnList(sqlCon2, us, sqlBulkCopy);
sqlBulkCopy.BatchSize = 1000;
sqlBulkCopy.DestinationTableName =
us.StagingSchemaName + "." + us.StagingTableName;
sqlBulkCopy.WriteToServer(ffReader);
sqlTran.Commit();
sqlCon2.Close();
}
}
}
}
sqlCon.ChangeDatabase(edwDBName);
sqlCon.Close();
sqlCon.Open();
SetRowCount(us, sqlCon);
sqlCon.Close();
us.EndJobTime = DateTime.Now;
sqlCon.Open();
LogStagingProcess(us, sqlCon);
sqlCon.Close();
Console.WriteLine(us.ProcessedRowCount + " rows inserted.");
Console.WriteLine("---------------------------------------------");
Console.WriteLine("Success! End Time: " + us.EndJobTime);
Console.WriteLine("---------------------------------------------");
Console.ReadLine();
}
catch (SqlException e)
{
RenderExceptionMessagesAndExit(e,
"Exception have occured during an attempt to utilize SqlBulkCopy\n");
}
}