1

你好,先生,

在数据摄取模板中,我需要获取此属性,因为我有带有日期字段的数据

日期数据 12-07-2018 a 13-07-2018 b 14-07-2018 c 15-07-2018 d

在那,我想采取最新的,即 15-07-2018

如果日期字段有新数据 16-07-2018 e 那么我必须通过检查最后更新日期 15-07-2018 而不是从第一个 12-07-2018 检查来获得 16-07-2018

像这样,如果我得到 17-08-2108 f 那么必须通过检查最后一个新日期 16-07-2018 来得到 17-08-2018 ..

如何实现这一点,我必须在哪个处理器中进行修改或必须添加新属性 当提要再次运行时,它如何获取最新的水印并从那里工作

4

1 回答 1

0

我想到了两种可能的方法:

  1. 编写您自己的 Spark 应用程序,该应用程序将用于 ( ExecuteSparkJob) 读取被摄取的文件。在这种情况下,您会跟踪最大日期,并且当您完成摄取时,将其保存在某个地方。如果您在 HDP 世界中,那么简单的方法就是将最大日期插入 Hive(事务)表。您还可以利用 ZooKeeper znode 来持久化,甚至PutDistributedMapCache是 NiFi 提供的处理器。
  2. 编写一个自定义的 NiFi 处理器,它基本上可以做与上述相同的事情,除了你必须自己启用它来处理不同格式的数据(CSV、JSON)。在这方面,Spark 内置了许多东西。
于 2018-07-17T04:08:54.853 回答