我集成了两个工具 Apache NiFi 和 Apache Flink。NiFi 接收事件并将它们发送到 Flink,然后 Flink 在对同一个 NiFi 进行一些处理后将这些事件返回。我在 Flink 中为 Nifi 构建了源和接收器。整个过程有效,但是sink的性能很差(大约每秒10个事件)。
如果我移除接收器(仅打印输出),处理速度会高得多。
我发现我可以使用 更改接收器进程的并行度,这setParallelism()当然有帮助,但是基本吞吐量太低了。我也尝试使用requestBatchCount(1000),但没有任何改变。
可能我的问题与交易有关。在每个事件接收器等待关闭事务之后,但我不确定并且我不知道如何更改它,例如在一个事务中发送数百个事件。
我可以做些什么来提高水槽的性能?
这是我的接收器定义:
SiteToSiteClientConfig sinkConfig = new SiteToSiteClient.Builder()
.url("http://" + host + ":" + port + "/nifi")
.portName("Data from Flink")
.buildConfig();
outStream.addSink(new NiFiSink<String>(sinkConfig, new NiFiDataPacketBuilder<String>() {
public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
return new StandardNiFiDataPacket(s.getBytes(), new HashMap<>());
}
}));
现在我正在使用最新版本的 Flink (1.5.1) 和 NiFi (1.7.1)
