我正在研究应该通过 kafka 写入 hdfs 的项目。假设有在线服务器将消息写入 kafka。每条消息都包含时间戳。我想根据消息中的时间戳创建一个输出将是文件/文件的作业。例如如果kafka中的数据是
{"ts":"01-07-2013 15:25:35.994", "data": ...}
...
{"ts":"01-07-2013 16:25:35.994", "data": ...}
...
{"ts":"01-07-2013 17:25:35.994", "data": ...}
我想将 3 个文件作为输出
kafka_file_2013-07-01_15.json
kafka_file_2013-07-01_16.json
kafka_file_2013-07-01_17.json
当然,如果我再次运行这项工作并且队列中有一条新消息,例如
{"ts":"01-07-2013 17:25:35.994", "data": ...}
它应该创建一个文件
kafka_file_2013-07-01_17_2.json // second chunk of hour 17
我看过一些开源,但其中大多数从 kafka 读取到一些 hdfs 文件夹。这个问题的最佳解决方案/设计/开源是什么