4

我正在为我们的 ETL 流程评估 Nifi。我想构建以下流程:从 SQL 数据库中获取大量数据 -> 拆分为块,每个块 1000 条记录 -> 计算每个块中的错误记录 -> 计算错误记录的总数 -> 如果超过阈值失败过程 - > 否则将每个块保存到数据库。

我无法解决的问题是如何等到所有块都得到验证。例如,如果我有 5 个验证任务同时工作,我需要某种屏障来等待所有块都被处理,然后才运行错误计数处理器,因为我不想保存无效数据并在达到阈值时将其删除.

我的另一个问题是,是否有可能在多个节点上并行运行此验证处理器,并且仍然有可能等到它们全部完成。

4

2 回答 2

2

对此的一种解决方案是将ExecuteScript处理器用作“安全阀”,以在内存中保存一个简单的计数,该计数由第一次收到具有特定属性值的流文件触发(存储在本地/集群状态,基本上是键映射)attribute-valuecount)。一旦该值达到阈值,您可以生成一个新的流文件以路由到包含已完成的属性值的成功关系。在这种情况下,将其他结果(需要批处理的流文件)发送到MergeContent处理器并将最小批处理大小设置为您喜欢的任何值。阀门的后续处理器应将其调度策略设置为,Event Driven以便它仅在从阀门接收到流文件时运行。

于 2016-11-12T01:55:30.900 回答
1

在分布式 MapCache 中更新计数不是正确的方法,因为 fetch 和 update 是分开的,并且不能在仅增加计数的原子处理器中进行。

http://apache-nifi-users-list.2361937.n4.nabble.com/How-do-I-atomically-increment-a-variable-in-NiFi-td1084.html

于 2017-05-31T10:19:41.750 回答