1

//转换导入org.apache.flink.api.scala._

val s1mmePulsarProp = new Properties(){
  put("service.url", "pulsar+ssl://xxx1:6651,xxx2:6651,xxx3:6651")
  //https://xxx1:8443,xxx2:8443,xxx3:8443
  put("admin.url", "https://xxx2:8443")
  //put("partitionDiscoveryIntervalMillis", "5000")
  put("startingOffsets", "latest")
  put("topic", "persistent://bigdatatest/bigdatatest-namespace/5g_xdr_s1mme_testp4")
}
//Authentication: certFilePath, keyFilePath
/*val tlsAuth: Authentication = AuthenticationFactory.TLS(
  "/usr/tools/pulsartls/bigdatatest.cert.pem",
  "/usr/tools/pulsartls/bigdatatest.key-pk8.pem")*/
val tlsAuthMap = new util.HashMap[String,String]()
tlsAuthMap.put("tlsCertFile", "/usr/tools/pulsartls/bigdatatest.cert.pem")
tlsAuthMap.put("tlsKeyFile", "/usr/tools/pulsartls/bigdatatest.key-pk8.pem")
val tlsAuth: Authentication = AuthenticationFactory.create(
  "org.apache.pulsar.client.impl.auth.AuthenticationTls",
  tlsAuthMap)

//ClientConfigurationData
val pulsarClientConf = new ClientConfigurationData
pulsarClientConf.setServiceUrl("pulsar+ssl://xxx1:6651,xxx2:6651,xxx3:6651")
pulsarClientConf.setUseTls(true)
pulsarClientConf.setTlsAllowInsecureConnection(true)
pulsarClientConf.setTlsTrustCertsFilePath("/usr/tools/pulsartls/ca.cert.pem")
//clientConf.setTlsHostnameVerificationEnable(false)
pulsarClientConf.setAuthentication(tlsAuth)
pulsarClientConf.setUseTcpNoDelay(true)

val s1mmePulsarConsumer = new FlinkPulsarSource("https://xxx.xxx.xxx.xxx:8443",
                                        pulsarClientConf,
                                        new SimpleStringSchema(),
                                        s1mmePulsarProp)
                               .setStartFromLatest()



val s1mmePulsarSource = env.addSource(s1mmePulsarConsumer)
  .filter(_.trim.nonEmpty)
  .assignTimestampsAndWatermarks(new timestampExtractor(8))
  .name("s1mmesource")

s1mmePulsarSource.addSink(hadoopSink("hdfs://encrypt_data/ninecon/s1mmepulsar/s1mme/",8))
  .name("s1mme2hdfs")

我只是写了这些代码,但是 flink 代理总是抛出一个错误: 原因:org.apache.pulsar.client.admin.PulsarAdminException$NotAuthorizedException: HTTP 401 Unauthorized

4

0 回答 0