0

我需要使用 Azure 数据工厂中的数据流将 json 转换为 csv(或 SQL 表)或任何其他扁平结构。我需要从源json中获取某些层次结构中的属性名称和层次结构较低的子属性的值,并将它们作为列/行值添加到csv或任何其他扁平结构中。

源数据规则/约束

  1. 父级数据属性名称将动态更改(例如 ABCDataPoints、CementUse、CoalUse、ABCUseIndicators 名称是动态的)
  2. 层次结构始终与以下示例 json 中的相同。

我需要一些帮助来定义 Json 路径/表达式以获取名称 ABCDataPoints、CementUse、CoalUse、ABCUseIndicators 等。我能够弄清楚如何检索属性ValueValueDateValueScoreAsReported的值。

源数据结构:

{
"ABCDataPoints": {
    "CementUse": {
        "Value": null,
        "ValueDate": null,
        "ValueScore": null,
        "AsReported": [],
        "Sources": []
    },
    "CoalUse": {
        "Value": null,
        "ValueDate": null,
        "AsReported": [],
        "Sources": []
    }
},
"ABCUseIndicators": {
    "EnvironmentalControversies": {
        "Value": false,
        "ValueDate": "2021-03-06T23:22:49.870Z"
    },
    "RenewableEnergyUseRatio": {
        "Value": null,
        "ValueDate": null,
        "ValueScore": null
    }
},
"XYZDataPoints": {
    "AccountingControversiesCount": {
        "Value": null,
        "ValueDate": null,
        "AsReported": [],
        "Sources": []
    },
    "AdvanceNotices": {
        "Value": null,
        "ValueDate": null,
        "Sources": []
    }        
},
"XYXIndicators": {
    "AccountingControversies": {
        "Value": false,
        "ValueDate": "2021-03-06T23:22:49.870Z"
    },
    "AntiTakeoverDevicesAboveTwo": {
        "Value": 4,
        "ValueDate": "2021-03-06T23:22:49.870Z",
        "ValueScore": "0.8351945854483925"
    }     
}

}

预期的扁平结构 在此处输入图像描述

4

1 回答 1

1

背景:在与 Microsoft 的 ADF 专家多次通话后(我们的工作场所有 Microsoft/Azure 合作伙伴关系),他们得出结论,ADF 提供开箱即用的活动是不可能的,Dataflow 也不可能(尽管不需要使用数据流) 也不是展平特征。原因是 Dataflow/Flatten 仅展开 Array 对象,并且没有可用于选择属性名称的映射函数 - 自定义表达式处于内部 beta 测试中,并将在不久的将来在 PA 中。

结论/解决方案:我们基于与 Microsoft emps 的调用达成了一项协议,最终采用多种方法,但两者都需要自定义代码 - 如果没有自定义代码,使用开箱即用的活动是不可能的。

解决方案 1 :使用ADF 自定义活动根据要求使用一些代码进行展平。这样做的缺点是您需要使用外部计算(VM/Batch),支持的选项不是按需提供的。所以它有点贵,但如果有连续的流工作负载,效果最好。这种方法还可以持续监控输入源的大小是否不同,因为在这种情况下计算需要具有弹性,否则您将出现内存不足异常。

解决方案 2:仍然需要编写自定义代码 - 但在函数应用程序中。

  • 创建一个复制活动,其源为具有 Json 内容的文件(最好是存储帐户)。
  • 使用目标作为函数的休息端点(不作为函数活动,因为从 ADF 活动调用时它有 90 秒超时)
  • 函数应用程序将 Json 行作为输入并解析和展平。
  • 如果您使用上述方式,那么您可以扩展在每个请求中发送的行数以运行并扩展并行请求。
  • 该函数将根据需要对一个文件或多个文件进行展平并存储在 blob 存储中。
  • 管道将根据需要从那里继续。
  • 这种方法的一个问题是,如果范围中的任何一个失败,复制活动将重试,但它将再次运行整个过程。
于 2021-06-07T14:13:57.930 回答