0

我想使用 nifi 将文件从本地系统传输到 flink。我已经在 nifi 中配置了带有 GetFile 处理器和名称为“Data For Flink”的输出端口的管道。在 Flink 端,我将 flink-connector-nifi_2.11 与 flink 一起使用。下面是我在 Flink 中的 nifi 客户端代码。

         SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
             //   .url("https://localhost:9443/nifi")
                .url("http://localhost:8080/nifi")
                .portName("Data For Flink")
                .transportProtocol(SiteToSiteTransportProtocol.HTTP)
                .requestBatchCount(5)
                .buildConfig();

        SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
        DataStream<NiFiDataPacket> streamSource = streamExecEnv.addSource(nifiSource);
        DataStream<String> data = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
            @Override
            public String map(NiFiDataPacket line) throws Exception {
                String row = new String(line.getContent(), Charset.defaultCharset());

                return row;
            }
        });
        data.print();
        streamExecEnv.execute(); 

当我运行上面的代码时,我没有得到任何输出以及任何错误。我的 Nifi 管道是否正确?我在这里错过了什么吗?

另外,我是否需要配置安全的 nifi 实例或任何站点到站点的属性才能将数据从 Nifi 传输到 Flink?

我也试过了,但我遇到了 ca 证书的问题。我已经用 tls-toolkit 生成了 PKCS12 证书。但是当我尝试将其上传到 java 信任库时,我收到错误消息 - 需要 x509 证书。

编辑 1:我在 IntelliJ IDEA 上运行上述代码,并在类路径上添加了 Flink 依赖项。

我是 NiFi 和 Flink 的新手。我很感激任何帮助!

4

0 回答 0