1

我搜索了很多地方,很少使用论坛发帖......所以我相信我在这里遇到了一个罕见的问题。我有一个包含一对多关系的 csv 文件。比如说,前 10 个字段包含一条记录。这继续到一个空白字段,然后在每个附加字段中是另一个分隔符,它是一个包含字段的管道。这实际上是与单个呼叫相关的事件的日志文件。

例如。

ID;MACHINEID;AN​​I;; 命令|参数|20120112.06:15:32

粗体文本可以重复多次或几次。这是 SQL 连接的多方面。我需要把它恢复到 SQL 中。我在想 SSIS 脚本块。我在其中循环并从一侧添加到带有 ID 的多个表中。使用管道作为分隔符拆分内部字段。我只是想知道是否有人以前见过这个并且有一个我可能不知道的简单解决方案。我想这会更好地用 XML 创建,但事实并非如此。

任何帮助表示赞赏。如果它主要是建设性的,我什至会接受批评。提前非常感谢。

展示餐桌妆容

CREATE TABLE [dbo].[tblIVRCalls](
    [CALLID] [char](50) NOT NULL,
    [MACHINEID] [char](50) NOT NULL,
    [CHANNEL] [char](50) NOT NULL,
    [DNIS] [char](50) NOT NULL,
    [ANI] [char](50) NOT NULL,
    [STARTTIME] [smalldatetime] NOT NULL,
    [TRUNK] [char](50) NOT NULL,
    [APPLICATION] [char](50) NULL,
    [PLANID] [char](50) NULL,
    [DERIVEDID] [char](50) NULL,
    [TOTALTIME] [smalldatetime] NOT NULL,
 CONSTRAINT [PK_tblIVRCalls] PRIMARY KEY CLUSTERED 

CREATE TABLE [dbo].[IVRCallActions](
    [pk] [bigint] IDENTITY(1,1) NOT NULL,
    [fkCALLID] [char](50) NOT NULL,
    [SequenceNumber] [bigint] NOT NULL,
    [Command] [char](50) NOT NULL,
    [Arguments] [varchar](max) NOT NULL,
    [ExecutionTime] [datetime] NOT NULL,
 CONSTRAINT [PK_IVRCallActions] PRIMARY KEY CLUSTERED 
4

3 回答 3

0

一种选择是将数据导入临时表,其中重复的元素存储为 varchar(max)(或纯粹主义者的 nvarchar(max))。

然后执行以下操作:

  1. 将非重复字段插入表 1。
  2. 使用@@Identity 捕获刚刚插入的行的标识
  3. 循环遍历重复字段,首先提取行(由某个分隔符定义),然后是行的组成部分(由“|”定义)。
  4. 将每一行插入表 2,其标识来自 (2)。

我承认步骤(3)不是最简单的事情。但是,如果您编写存储过程并使用拆分函数(http://stackoverflow.com/questions/2647/split-string-in-sql),那么它是可行的。

以我的经验,复杂的字符串操作在数百兆字节内都可以正常工作。如果您需要一次处理多个 GB,那么您可能需要自定义代码。

于 2012-05-02T18:28:46.330 回答
0

我建议创建一个控制台程序(使用 C#、vbs 或您可以使用的任何东西)从原始文件中获取 2 个文件。

然后你可以在 SSIS 中处理这两个文件,它非常擅长管理空字段、更改数据类型等等。

该程序将简单地逐行读取原始文件,对其进行处理以获取父行和相关子行,并将父行写入一个文件,将子行写入另一个文件。这些文件将是 SSIS 包的条目。

不该做什么:

  • 在 T-SQL 中进行这种处理要困难得多,因为它没有太多允许处理字符串的函数。
  • 在 SSIS 中进行此操作是不可能的。没有任何组件可以将一行分解为几行。更不用说它应该产生两种不同类型的行。(一个自定义的 SSIS 组件可以做到这一点,但它不值得努力)
于 2012-05-02T23:09:49.223 回答
0

根据在SSIS 设计模式:加载可变长度行中找到的信息,我能够创建以下过程。SSIS 脚本块可以做任何你可以用 C# 做的事情,并且可以安装到 SQL Server 以按设定的时间间隔运行。为处理创建了 2 个输出。您通过此脚本为每个缓冲区填充。因此,一个平面文件包含一列。2个输出和这个脚本之间。

    public override void Input0_ProcessInputRow(Input0Buffer Row)
    {
        IVRCallsBuffer.AddRow();
        string[] splitEntireRow = null;
        var byteRow = Row.Column0.GetBlobData(0,(int)Row.Column0.Length);
        var entireRow = System.Text.Encoding.ASCII.GetString(byteRow);
        splitEntireRow = entireRow.Split(';');
        int fieldPosition = 0;
        string CallID = "";
        while (splitEntireRow[fieldPosition] != "")
        {
            var splitIVRCall = splitEntireRow[fieldPosition].Split('=');
            switch (splitIVRCall[0])
            {
                case "CALLID":
                    IVRCallsBuffer.CALLID = splitIVRCall[1];
                    CallID = splitIVRCall[1];
                    break;
                case "MACHINEID":
                    IVRCallsBuffer.MACHINEID = splitIVRCall[1];
                    break;
                case "CHANNEL":
                    IVRCallsBuffer.CHANNEL = splitIVRCall[1];
                    break;
                case "DNIS":
                    IVRCallsBuffer.DNIS = splitIVRCall[1];
                    break;
                case "ANI":
                    IVRCallsBuffer.ANI = splitIVRCall[1];
                    break;
                case "STARTTIME":
                    IVRCallsBuffer.STARTTIME = DateTime.ParseExact(splitIVRCall[1], "yyyyMMdd.HH:mm:ss.fff", System.Globalization.CultureInfo.CurrentUICulture);
                    break;
                case "TRUNK":
                    IVRCallsBuffer.TRUNK = splitIVRCall[1];
                    break;
                case "APPLICATION":
                    IVRCallsBuffer.Application = splitIVRCall[1];
                    break;
                case "PLANID":
                    IVRCallsBuffer.PLANID = splitIVRCall[1];
                    break;
                case "DERIVEDID":
                    IVRCallsBuffer.DERIVEDID = splitIVRCall[1];
                    break;
                case "TOTALTIME":
                    IVRCallsBuffer.TOTALTIME = DateTime.ParseExact(splitIVRCall[1],"mm:ss.fff",System.Globalization.CultureInfo.CurrentUICulture);
                    break;   
            }
            fieldPosition++;
        }
        fieldPosition++;
        uint sequence = 1;
        while (splitEntireRow.GetUpperBound(0) -1 > fieldPosition)
        {
            IVRCallActionsBuffer.AddRow();
            var splitIVRCallAction = splitEntireRow[fieldPosition].Split('|');
            IVRCallActionsBuffer.fkCALLID = CallID;
            IVRCallActionsBuffer.SequenceNumber = sequence;
            IVRCallActionsBuffer.Command = splitIVRCallAction[0];
            IVRCallActionsBuffer.Arguments = splitIVRCallAction[1];
            IVRCallActionsBuffer.ExecutionTime = DateTime.ParseExact(splitIVRCallAction[2],"yyyyMMdd.HH:mm:ss.fff", System.Globalization.CultureInfo.CurrentUICulture);
            fieldPosition++;
            sequence++;
        }

    }
}
于 2012-05-03T17:49:47.180 回答