1

我目前正在使用 Nifi 使用数据来读取 Tealium 事件流并加载到 HDFS 中。当源未发送属性数据时需要帮助过滤数据。

{"account":"newtv","twitter:description":"发现您最喜欢的 NewTV 节目和主持人的播放时间。","og:locale":"en_US","dcterms:publisher":"NewTV", "original-source":"www.newtv.com/","og:url":"www.newtv.com/show/program-guide"}},"post_time":"2019-10-09 11:27 :46","useragent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36","event_id":"12345"}

上面的消息示例。当源错过从以下示例数据集中发送 event_id 属性的数据时,我目前坚持过滤数据。

当前 Nifi 流程,使用 Kafka -> 评估 Json 路径 -> Jolttransform Json -> 评估 Json 路径 -> RouteOnAttribute -> 合并内容 -> 评估 Json 路径 -> 更新属性 -> PutHDFS -> MoveHDFS

需要帮助如何使用 RouteOnAttribute 拆分数据,以将缺少的 event_id 属性或 attribute_value 区分为两个不同的流。带有属性或属性值和缺失值的流会出错并加载到不同的输出路径中。

4

1 回答 1

1

EvaluateJsonPath处理器中添加新属性以event_id从流文件中提取值。

如果流文件没有,event_id则 nifi 向属性添加空值。

EvaluateJsonPath Configs: 在此处输入图像描述

然后通过使用RouteOnAttribute处理器,我们可以检查属性值并相应地路由流文件。

RouteOnAttribute Configs:

非空值

${event_id:isEmpty():not()}

空值

${event_id:isEmpty()}

在此处输入图像描述

然后使用空值而不是空值关系进行进一步处理..!!

于 2019-10-16T02:05:07.650 回答