2

在我们的遗留架构中,我们有一个 MS SQL 服务器数据库,该数据库几乎实时存储所有传感器信息,平均每秒接收 100 条记录。为了获得有关传感器事件的完整信息,我们需要加入数据库中有 2 到 3 个表。

Sample Query:

SELECT SOMETHING
FROM TABLE1 AS tab1 
INNER JOIN TABLE2 AS tab2 ON tab1.UpdateID=tab2.ID 
INNER JOIN TABLE3 as tab3 ON tab1.TagID=tab3.ID 
WHERE tab2.UpdateTime > ${lastExtractUnixTime}

我们的要求是每 1 分钟获取一次上述查询的捕获数据变化,并将记录发布到 Kafka。

暂时我正在使用 Spark Core JDBC 进行 CDC,处理记录,发送到 Kafka 并将 CDC 信息与${lastExtractUnixTime}HBase 一起作为 Phoenix 表进行维护。作业安排为每 1 分钟的批处理间隔。

作为一个长期的解决方案,我们计划使用 Apache Nifi 来做 CDC 的事情并将信息发布到 Kafka,Spark 流将从 Kafka 读取消息,将在其上应用一些业务逻辑并将丰富的数据发送到另一个 Kafka 主题; 我没有找到合适的处理器,这将帮助我动态地传递${lastExtractUnixTime}in SQL 并每 1 或 2 分钟获取一次增量记录。

请建议如何使用 Apache Nifi 完成此操作。

4

0 回答 0