6

我正在使用 Kafka Streams (v0.10.0.1) 编写一个应用程序,并希望使用查找数据来丰富我正在处理的记录。该数据(带时间戳的文件)每天(或每天 2-3 次)写入 HDFS 目录。

如何在Kafka Streams应用程序中加载它并加入实际KStream
当新文件到达那里时,从 HDFS 重新读取数据的最佳做法是什么?

还是将Kafka ConnectRDBMS 表内容切换到所有 Kafka Streams 应用程序实例都可以使用的 Kafka 主题并将其写入会更好?

更新
正如建议的Kafka Connect将是要走的路。因为查找数据每天都会在 RDBMS 中更新,所以我考虑将 Kafka Connect 作为计划的一次性作业运行,而不是保持连接始终打开。是的,因为语义和保持连接始终打开并确保它不会被中断的开销......等等。对我来说,在这种情况下进行预定提取看起来更安全。

查找数据不大,可能会删除/添加/修改记录。我也不知道如何始终将完整转储到 Kafka 主题并截断以前的记录。启用日志压缩并为已删除的键发送空值可能不起作用,因为我不知道源系统中已删除什么。另外 AFAIK 当压缩发生时我没有控制权。

4

1 回答 1

3

推荐的方法确实是将查找数据也摄取到 Kafka 中——例如通过 Kafka Connect——正如您在上面自己建议的那样。

但是在这种情况下,我如何安排 Connect 作业每天运行,而不是连续从源表中获取,这在我的情况下是不必要的?

也许您可以更新您不希望连续运行 Kafka Connect 作业的问题?您是否关心资源消耗(数据库上的负载),如果不是“每日更新”,您是否关心处理的语义,或者......?

更新: 正如建议的 Kafka Connect 将是要走的路。因为查找数据每天都会在 RDBMS 中更新,所以我考虑将 Kafka Connect 作为计划的一次性作业运行,而不是保持连接始终打开。是的,因为语义和保持连接始终打开并确保它不会被中断的开销......等等。对我来说,在这种情况下进行预定提取看起来更安全。

Kafka Connect安全的,并且 JDBC 连接器的构建正是为了以稳健、容错和高性能的方式将 DB 表馈入 Kafka(已经有许多生产部署)。所以我建议不要仅仅因为“看起来更安全”而回退到“批量更新”模式;就个人而言,我认为触发每日摄取在操作上不如让它运行以进行连续(和实时!)摄取更方便,而且它还会为您的实际用例带来一些不利因素(请参阅下一段)。

但是,当然,您的里程可能会有所不同——所以如果您打算每天只更新一次,那就去吧。但是您失去了 a) 在扩充发生的时间点使用最新数据库数据扩充传入记录的能力,并且相反,b) 您实际上可能会使用陈旧/旧数据来扩充传入记录,直到下一天更新已完成,这很可能会导致您向下游发送的数据不正确/可供其他应用程序使用以供使用。例如,如果客户更新了她的送货地址(在数据库中),但您每天只将此信息提供给您的流处理应用程序(可能还有许多其他应用程序)一次,那么订单处理应用程序会将包裹运送到错误的位置地址,直到下一次每日摄取完成。

查找数据不大,可能会删除/添加/修改记录。我也不知道如何始终将完整转储到 Kafka 主题并截断以前的记录。启用日志压缩并为已删除的键发送空值可能不起作用,因为我不知道源系统中已删除什么。

Kafka Connect 的 JDBC 连接器已经为您自动处理:1. 它确保数据库插入/更新/删除正确反映在 Kafka 主题中,以及 2. Kafka 的日志压缩确保目标主题不会超出范围. 也许您可能想阅读文档中的 JDBC 连接器以了解您可以免费获得哪些功能:http: //docs.confluent.io/current/connect/connect-jdbc/docs/

于 2016-09-05T09:01:07.390 回答