0

我正在尝试使用安装了 x-pack 的 Elasticsearch (ES) 6.1.1 Hadoop 来使用 Spark Structured Streaming 2.2.1 写入数据。这是我的代码(索引已经存在于弹性中):

val exceptions = spark
  .readStream
  .text(path)
val advancedQuery = exceptions
  .writeStream
  .format("org.elasticsearch.spark.sql")
  .trigger(Trigger.ProcessingTime(10.seconds))
  .option("checkpointLocation", "/checkpoint")
val runningQuery = advancedQuery.start("spark/exc")
runningQuery.awaitTermination

但我确实得到了以下异常:

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
    at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:327)
    at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:575)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:58)
    at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run(EsStreamQueryWriter.scala:41)
    at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:52)
    at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:51)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
**Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: missing authentication token for REST request [/]
null**

如何设置所需的身份验证数据?

4

1 回答 1

1

想通了:需要添加两个附加选项“es.net.http.auth.user”和“es.net.http.auth.pass”,如下所示:

val advancedQuery = exceptions
  ...
  .option("es.net.http.auth.user", "*your elastic user goes here*")
  .option("es.net.http.auth.pass", "*your elastic password goes here*")
  ...
于 2018-01-19T17:45:06.413 回答