11

我正在研究应该通过 kafka 写入 hdfs 的项目。假设有在线服务器将消息写入 kafka。每条消息都包含时间戳。我想根据消息中的时间戳创建一个输出将是文件/文件的作业。例如如果kafka中的数据是

 {"ts":"01-07-2013 15:25:35.994", "data": ...}
 ...    
 {"ts":"01-07-2013 16:25:35.994", "data": ...}
 ... 
 {"ts":"01-07-2013 17:25:35.994", "data": ...}

我想将 3 个文件作为输出

  kafka_file_2013-07-01_15.json
  kafka_file_2013-07-01_16.json
  kafka_file_2013-07-01_17.json 

当然,如果我再次运行这项工作并且队列中有一条新消息,例如

 {"ts":"01-07-2013 17:25:35.994", "data": ...}

它应该创建一个文件

  kafka_file_2013-07-01_17_2.json // second  chunk of hour 17

我看过一些开源,但其中大多数从 kafka 读取到一些 hdfs 文件夹。这个问题的最佳解决方案/设计/开源是什么

4

5 回答 5

7

Camus API您绝对应该从linkedIn 中查看实现。Camus 是 LinkedIn 的 Kafka->HDFS 管道。这是一个 mapreduce 作业,可以从 Kafka 中加载分布式数据。查看我为一个简单的示例编写的这篇文章,该示例从推特流中获取并基于推文时间戳写入 HDFS。

项目在 github 上可用 - https://github.com/linkedin/camus

Camus 需要两个主要组件来读取和解码来自 Kafka 的数据以及将数据写入 HDFS –</p>

解码从 Kafka 读取的消息

Camus 有一组解码器,有助于解码来自 Kafka 的消息,解码器基本上扩展com.linkedin.camus.coders.MessageDecoder了它实现基于时间戳对数据进行分区的逻辑。此目录中存在一组预定义的解码器,您可以根据这些编写自己的解码器。 camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/

将消息写入 HDFS

Camus 需要一组 RecordWriterProvider 类,这些类com.linkedin.camus.etl.RecordWriterProvider将告诉 Camus 应该写入 HDFS 的有效负载是什么。此目录中存在一组预定义的 RecordWriterProvider,您可以根据这些编写自己的。

camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common
于 2015-02-19T05:50:14.583 回答
2

如果您使用的是 Apache Kafka 0.9 或更高版本,则可以使用 Kafka Connect API。

查看https://github.com/confluentinc/kafka-connect-hdfs

这是一个 Kafka 连接器,用于在 Kafka 和 HDFS 之间复制数据。

于 2017-02-09T08:50:21.467 回答
2

如果您正在寻找更实时的方法,您应该查看StreamSets Data Collector。它也是一个 Apache 许可的用于摄取的开源工具。

HDFS 目标可配置为根据您指定的模板写入基于时间的目录。它已经包含了一种在传入消息中指定字段的方法,用于确定应该写入消息的时间。该配置称为“时间基础”,您可以指定类似${record:value("/ts")}.

*完全披露我是一名工程师,正在研究这个工具。

于 2015-11-11T00:10:29.800 回答
1

检查此内容以了解从 Kafka 到 HDFS 的连续摄取。由于它依赖于Apache Apex,因此它具有 Apex 提供的保证。

https://www.datatorrent.com/apphub/kafka-to-hdfs-sync/

于 2016-11-15T01:28:39.360 回答
0

结帐加缪: https ://github.com/linkedin/camus

不过,这将以 Avro 格式写入数据……其他 RecordWriters 是可插入的。

于 2013-07-10T00:09:11.513 回答