0

我目前正在编写 ElasticSearch Nifi 处理器。对 ES 的单独插入/写入不是最优的,而是首选批处理文档。在 Nifi 处理器中跟踪(批处理)文档(FlowFiles)以及何时以一定数量将它们批处理的最佳方法是什么?我最关心的部分是ES不可用、宕机、网络分区等导致批处理无法成功。问题的主要观点是,Nifi 具有用于排队/背压等的内容存储。是否有一种首选方法可以使用它来确保在目的地关闭时不会丢失 FlowFiles?也许我应该以另一个处理器为例?

我查看了 Mongo 处理器、Merge 等,试图了解在处理器内部进行批处理的首选方法,但似乎找不到任何具体的东西。任何建议,将不胜感激。

很有可能我忽略了 Nifi 中的一些基本功能。我对这个平台还是很陌生。

谢谢!

4

1 回答 1

2

很好的问题和一个很常见的模式。这就是为什么我们有 ProcessSession 的概念。它允许您向外部端点发送零个或多个东西,并且只有在您知道它已被收件人确认后才提交。从这个意义上说,它提供了至少一次语义。如果您使用的协议支持两阶段提交样式语义,您可以非常接近难以捉摸的完全一次语义。您在此处询问的大部分细节将取决于目标系统 API 和行为。

apache 代码库中有一些示例揭示了执行此操作的方法。一种方法是,如果您可以在推送到目标系统之前生成事件的合并集合。取决于它的API。我认为 PutMongo 和 PutSolr 以这种方式运作(尽管这方面的专家需要权衡)。一个可能更像您正在寻找的示例可以在 PutSQL 中找到,它对成批的流文件进行操作以在单个事务中发送(在目标数据库上)。

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ PutSQL.java

会关注这里,但可以通过 users@nifi.apache.org 获得更大的 NiFi 组的关注

谢谢乔

于 2015-11-26T19:25:36.860 回答