1

我正在尝试完成一些在 SQL 中很容易完成的事情,但在不使用 SQL 的情况下在 SSIS 中完成似乎非常具有挑战性。基本上,我需要合并和连接一个多对一关系的字段。

给定实体:[合同项目](许多)到(一个)[帐户]

有一个字段 [ ari_productsummary ] 包含合同项目实体上列出的产品。我们想将该值作为 [ ari_activecontractitems ]写入帐户。但是,一个Account可能有多个与之关联的Contract Item记录,在这种情况下,我们希望连接这些值。我们也只希望连接不同的值(在我的数据流中已经解决了不同的行)。

这可以通过写入临时表来完成,然后使用查询或视图来获取汇总结果,如下所示。我创建了一个名为TESTTABLE的 SQL 表,其中包含来自Contract Item实体的 [ ari_productsummary ]以及引用 [ accountid ] 以将其映射回Account。然后我编写了以下查询作为视图:

SELECT distinct accountid,
        (SELECT TT2.ari_productsummary + '; ' 
                FROM TESTTABLE TT2
                WHERE TT2.accountid = TT.accountid
                FOR XML PATH ('')
            ) AS 'ari_activecontractitems'
FROM TESTTABLE TT

执行该查询为我提供了我想要的结果,然后我可以将其用于导入到Account实体中,如下所示:

SSIS 数据流,截断临时 sql 表,写入 sql 表,从视图中提取并写入对象

但是如何在 SSIS 数据流中执行此操作而不写入 SQL 表作为数据的临时占位符? 我想在一个数据流容器内完成整个过程,而不使用临时 SQL 表/视图。整个总结过程需要即时完成:

我正在尝试构建的不需要 SQL 临时表/视图的数据流模型

是否有人有不需要临时 SQL 表/视图/查询但完全包含在数据流中的解决方案?

我正在使用 VS 2017 和 KingswaySoft Dynamic CRM 365 ETL 工具集来开发我的解决方案/包。

4

2 回答 2

1

我们有一个即将发布的版本,其中包含一个组件,它可以完全实现您想要实现的目标,而无需编写自定义代码。该功能目前处于预览阶段,请与我们联系以获取该功能的私人访问权限。您可以在我们的网站上找到我们的联系信息。

更新 - 2020 年 6 月 5 日,由于我们的 2020 版本第 1 波,我们已在https://www.kingswaysoft.com/products/ssis-productivity-pack/上公开了这些组件。我们有两个组件可用服务于这种目的。组合组件将获取输入值并转换为 SSIS 列中的复合值。Decomposition 组件则相反,它将接受一个输入值并使用基于分隔符的文本拆分或 XML/JSON 数组拆分将其拆分为多行。

于 2020-05-29T16:58:48.553 回答
1

在这里吐球,因为我没有动态也没有自定义组件。

数据流 1 - 合约聚合

此数据流的目的是在您提供的优雅查询中复制您的逻辑并将其推送到缓存连接管理器中(请参阅末尾的 2008+ 注释)

KingswaySoft Dynamics Source -> Script Task -> Cache Transform

如果要保留排序,请在脚本任务之前执行。我将对 Script Task 采用的实现是它是完全阻塞的——也就是说,所有行必须在它可以发送之前到达。像 Merge Join 这样的任务只是部分阻塞,因为排序数据的要求意味着一旦您不再有当前项目的匹配项,您可以将其发送到管道中。

脚本任务将是异步转换。您将有两个输出列,您的密钥accountid和新的派生列ari_activecontractitems。该列可能需要很大 - 您最了解您的数据,但如果它是 Dynamics 中的 blob 类型(> 4k unicode 或 > 8k ascii 字符),那么您必须将数据类型定义为 DT_TEXT/DT_NTEXT

作为输入,您将从源中选择 accountid 和 ari_productsummary。

代码应该很简单。我们要将入站数据累积到字典中。

    //  member variable
    Dictionary<string, List<string>> accumulator;

PreProcess 方法,我们将在其中添加它以初始化我们的变量

    // initialize in PreProcess method
    accumulator = new Dictionary<string, List<string>>();

在 OnBufferRowSent 中(名称大约)

    // simulate the inbound queue
    // row_id would be something like Rows.row_id
    if (!accumulator.ContainsKey(row_id))
    {
        // Create an empty dictionary for our list
        accumulator.Add(row_id, new List<string>());
    }

    // add it if we don't have it
    if (!accumulator[row_id].Contains(invoice))
    {
        accumulator[row_id].Add(invoice);
    }

一旦你收到没有更多可用数据的信号,那就是你开始缓冲输出数据的时候。自动生成的代码将包含所有这些的占位符。

    // This is how we shove data out the pipe
    foreach(var kvp in accumulator)
    {
        // approximately thus
        OutputBuffer1.AddRow();
        OutputBuffer1.row_id = kvp.Key;
        OutputBuffer1.ari_productsummary = string.Join("; ",  kvp.Value);

    }
于 2020-05-29T02:41:02.737 回答