我想使用 nifi 将文件从本地系统传输到 flink。我已经在 nifi 中配置了带有 GetFile 处理器和名称为“Data For Flink”的输出端口的管道。在 Flink 端,我将 flink-connector-nifi_2.11 与 flink 一起使用。下面是我在 Flink 中的 nifi 客户端代码。
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
// .url("https://localhost:9443/nifi")
.url("http://localhost:8080/nifi")
.portName("Data For Flink")
.transportProtocol(SiteToSiteTransportProtocol.HTTP)
.requestBatchCount(5)
.buildConfig();
SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> streamSource = streamExecEnv.addSource(nifiSource);
DataStream<String> data = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
@Override
public String map(NiFiDataPacket line) throws Exception {
String row = new String(line.getContent(), Charset.defaultCharset());
return row;
}
});
data.print();
streamExecEnv.execute();
当我运行上面的代码时,我没有得到任何输出以及任何错误。我的 Nifi 管道是否正确?我在这里错过了什么吗?
另外,我是否需要配置安全的 nifi 实例或任何站点到站点的属性才能将数据从 Nifi 传输到 Flink?
我也试过了,但我遇到了 ca 证书的问题。我已经用 tls-toolkit 生成了 PKCS12 证书。但是当我尝试将其上传到 java 信任库时,我收到错误消息 - 需要 x509 证书。
编辑 1:我在 IntelliJ IDEA 上运行上述代码,并在类路径上添加了 Flink 依赖项。
我是 NiFi 和 Flink 的新手。我很感激任何帮助!