1

我集成了两个工具 Apache NiFi 和 Apache Flink。NiFi 接收事件并将它们发送到 Flink,然后 Flink 在对同一个 NiFi 进行一些处理后将这些事件返回。我在 Flink 中为 Nifi 构建了源和接收器。整个过程有效,但是sink的性能很差(大约每秒10个事件)。

如果我移除接收器(仅打印输出),处理速度会高得多。

我发现我可以使用 更改接收器进程的并行度,这setParallelism()当然有帮助,但是基本吞吐量太低了。我也尝试使用requestBatchCount(1000),但没有任何改变。

可能我的问题与交易有关。在每个事件接收器等待关闭事务之后,但我不确定并且我不知道如何更改它,例如在一个事务中发送数百个事件。

我可以做些什么来提高水槽的性能?

Apache NiFi 流程

这是我的接收器定义:

SiteToSiteClientConfig sinkConfig = new SiteToSiteClient.Builder()
    .url("http://" + host + ":" + port + "/nifi")
    .portName("Data from Flink")
    .buildConfig();

outStream.addSink(new NiFiSink<String>(sinkConfig, new NiFiDataPacketBuilder<String>() {
    public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
        return new StandardNiFiDataPacket(s.getBytes(), new HashMap<>());
    }
}));

现在我正在使用最新版本的 Flink (1.5.1) 和 NiFi (1.7.1)

4

1 回答 1

1

Apache Flink 提供的 NiFiSink 正在为每个事件创建一个事务:

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java#L60- L67

这样做是为了使错误处理清晰,以便如果事务未能提交,它将在失败的特定事件的上下文中从调用方法中抛出异常。

您可以实现自己的自定义版本,允许在事务上调用提交之前发送许多事件,但我对 Flink 的了解不够,无法理解如果事务稍后提交失败并且您不再拥有这些事件会发生什么.

于 2018-07-25T13:02:52.733 回答