2

我对 Kafka 不太熟悉,但我想知道从 Kafka 批量读取数据的最佳方法是什么,以便我可以使用 Elasticsearch Bulk Api 更快、更可靠地加载数据。

顺便说一句,我正在为我的 Kafka 消费者使用 Vertx

谢谢,

4

3 回答 3

4

我不知道这是否是最好的方法,但是当我开始寻找类似的功能时,我找不到任何现成的框架。我发现了这个项目:

https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer/tree/branch2.0

并开始为它做出贡献,因为它没有做我想做的一切,而且也不容易扩展。现在 2.0 版本非常可靠,我们公司在生产中使用它每天处理/索引 300M+ 事件。

这不是自我推销:) - 只是分享我们如何做同类型的工作。当然,现在可能还有其他选择。

于 2015-11-14T03:57:58.373 回答
1

https://github.com/confluentinc/kafka-connect-elasticsearch

或者你可以试试这个来源

https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer

作为标准 Jar 运行

**1。将代码下载到 $INDEXER_HOME 目录中。

**2。cp $INDEXER_HOME/src/main/resources/kafka-es-indexer.properties.template /your/absolute/path/kafka-es-indexer.properties 文件 - 按照评论中的说明更新所有相关属性

**3。cp $INDEXER_HOME/src/main/resources/logback.xml.template /your/absolute/path/logback.xml

指定要存储日志的目录:

根据需要调整最大大小和日志文件数量的值

**4。构建/创建应用程序 jar(确保已安装 MAven):

cd $INDEXER_HOME
mvn clean package

kafka-es-indexer-2.0.jar 将在 $INDEXER_HOME/bin 中创建。所有依赖项都将放入 $INDEXER_HOME/bin/lib。所有 JAR 依赖项都通过 kafka-es-indexer-2.0.jar 清单链接。

**5。编辑您的 $INDEXER_HOME/run_indexer.sh 脚本: -- 如果需要,使其可执行(chmod a+x $INDEXER_HOME/run_indexer.sh) -- 根据您的环境更新标有“更改您的环境”注释的属性

**6。运行应用程序 [使用 JDK1.8]:

./run_indexer.sh
于 2017-04-19T10:56:08.697 回答
0

我使用了火花流,这是一个使用 Scala 的非常简单的实现。

于 2016-03-18T13:35:48.787 回答