我正在尝试从 IBM Analytics Engine 上的 Spark Structured Streaming 写入 IBM Compose Elasticsearch 接收器。我的火花代码:
dataDf
.writeStream
.outputMode(OutputMode.Append)
.format("org.elasticsearch.spark.sql")
.queryName("ElasticSink")
.option("checkpointLocation", s"${s3Url}/checkpoint_elasticsearch")
.option("es.nodes", "xxx1.composedb.com,xxx2.composedb.com")
.option("es.port", "xxxx")
.option("es.net.http.auth.user", "admin")
.option("es.net.http.auth.pass", "xxxx")
.option("es.net.ssl", true)
.option("es.nodes.wan.only", true)
.option("es.net.ssl.truststore.location", SparkFiles.getRootDirectory() + "/my.jks")
.option("es.net.ssl.truststore.pass", "xxxx")
.start("test/broadcast")
但是,我收到以下异常:
org.elasticsearch.hadoop.EsHadoopException: Could not get a Transport from the Transport Pool for host [xxx2.composedb.com:xxxx]
at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.borrowFrom(PooledHttpTransportFactory.java:106)
at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.create(PooledHttpTransportFactory.java:55)
at org.elasticsearch.hadoop.rest.NetworkClient.selectNextNode(NetworkClient.java:99)
at org.elasticsearch.hadoop.rest.NetworkClient.<init>(NetworkClient.java:82)
at org.elasticsearch.hadoop.rest.NetworkClient.<init>(NetworkClient.java:59)
at org.elasticsearch.hadoop.rest.RestClient.<init>(RestClient.java:94)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:317)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:576)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:58)
at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run(EsStreamQueryWriter.scala:41)
at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:52)
at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:51)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
有任何想法吗?