请告诉我在数据转换模板中添加提交发布高水位标记的过程,就像我们在数据摄取中所做的那样,将水位标记值设置为高。因为我需要获取增量/更新数据。
我通过阅读文档得到了这个解决方案,但不知道如何实际去做。最终我的要求是在数据转换中获取增量数据和新数据。
如果您对此有意见,请给我回复
请告诉我在数据转换模板中添加提交发布高水位标记的过程,就像我们在数据摄取中所做的那样,将水位标记值设置为高。因为我需要获取增量/更新数据。
我通过阅读文档得到了这个解决方案,但不知道如何实际去做。最终我的要求是在数据转换中获取增量数据和新数据。
如果您对此有意见,请给我回复
您的模板需要以 LoadHighWaterMark 处理器开始。这用于将当前的高水位标记值放入流文件属性中。在由 ReleaseHighWarkMark 处理器更新之前,初始值为 null。LoadHighWaterMark 处理器还阻止馈送执行,直到高水位线被释放。
然后,您需要将 ReleaseHighWaterMark 处理器连接到流程中的每个失败关系,并将另一个 ReleaseHighWaterMark 处理器连接到流程中的最终成功关系。重要的是每个流文件都以 ReleaseHighWaterMark 处理器结束,因为 LoadHighWaterMark 处理器会阻塞,直到当前流文件被释放。
ReleaseHighWaterMark 处理器上的 Mode 属性指示是保存新的高水位标记值还是保留现有值。最终成功关系的 ReleaseHighWaterMark 处理器应将 Mode 设置为 Commit,以便保存新的高水位标记值。所有故障关系的 ReleaseHighWaterMark 处理器都应将 Mode 设置为 Reject,以便保留现有的高水位标记值。
有关更多信息,请参阅 Kylo 文档:http: //kylo.readthedocs.io/en/latest/how-to-guides/NiFiProcessorsDocs.html#high-water-mark-processors