0

我的最终目标是将数据从 hdfs 插入到 elasticsearch 但我面临的问题是连接性

我可以使用以下 curl 命令连接到我的 elasticsearch 节点

curl -u username -X GET https://xx.xxx.xx.xxx:9200/_cat/indices?v' --insecure

但是当谈到与火花的连接时,我无法这样做。我插入数据的命令是 df.write.mode("append").format('org.elasticsearch.spark.sql').option("es.net.http.auth.user", "username").option("es.net.http.auth.pass", "password").option("es.index.auto.create","true").option('es.nodes', 'https://xx.xxx.xx.xxx').option('es.port','9200').save('my-index/my-doctype')

我得到的错误是

org.elastisearch.hadoop.EsHadoopIllegalArgumentException:Cannot detect ES version - typical this happens if then network/Elasticsearch cluster is not accessible or when targetting a Wan/Cloud instance without the proper setting 'es.nodes.wan.only'
....
....
Caused by: org.elasticseach.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proy settings)- all nodes failed; tried [[xx.xxx.xx.xxx:9200]]
....
...

在这里,pyspark 相当于 curl --insecure

谢谢

4

3 回答 3

1

经过多次尝试和不同的配置选项。我找到了一种方法如何不安全地连接在 https 上运行的 elastisearch

        dfToEs.write.mode("append").format('org.elasticsearch.spark.sql') \
        .option("es.net.http.auth.user", username) \
        .option("es.net.http.auth.pass", password) \
        .option("es.net.ssl", "true") \
        .option("es.net.ssl.cert.allow.self.signed", "true") \
        .option("mergeSchema", "true") \
        .option('es.index.auto.create', 'true') \
        .option('es.nodes', 'https://{}'.format(es_ip)) \
        .option('es.port', '9200') \
        .option('es.batch.write.retry.wait', '100s') \
        .save('{index}/_doc'.format(index=index))

(es.net.ssl, true)

我们还必须提供如下自签名证书

(es.net.ssl.cert.allow.self.signed, true)
于 2020-08-18T13:08:18.787 回答
0

我确实检查了很多东西,最后我可以在 AWS ElasticSearch 服务 (ES) 中编写,但使用 scala/spark。

  1. 在 VPC 中,创建安全组以使用端口 443 从 EMR 访问 ES(ES 中的入站规则到 EMR 的 SG,以及 EMR 中的入站规则到同一端口)
  2. 使用 telnet 命令检查来自 EMR 主节点的连接
    telnet xyz.eu-west-1.es.amazonaws.com 443
  1. 完成上述检查后,使用 curl 命令检查应用程序级别

    curl https://xyz.eu-west-1.es.amazonaws.com:443/domainname/_search?pretty=true&?q=*```
    
  2. 之后,转到代码,在我的情况下,我使用 spark-shell 进行了测试,但服务器配置文件包含在 start 中,如下所示:

     spark-shell --jars elasticsearch-spark-20_2.11-7.1.1.jar --conf spark.es.nodes="xyz.eu-west-1.es.amazonaws.com" --conf spark.es.port=443 --conf spark.es.nodes.wan.only=true --conf spark.es.nodes.discovery="false" --conf spark.es.index.auto.create="true" --conf spark.es.resource="domain/doc" --conf spark.es.scheme="https"
    
    1. 最后要写的代码:
    import java.util.Date
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    import org.elasticsearch.spark._
    import org.elasticsearch.spark.sql._
    val dateformat =  new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
    val currentdate = dateformat.format(new Date)
    val colorsDF = spark.read.json("multilinecolors.json")
    val mcolors = colorsDF.withColumn("Date",lit(currentdate))
    mcolors.write.mode("append").format("org.elasticsearch.spark.sql").option("es.net.http.auth.user", "").option("es.net.http.auth.pass", "").option("es.net.ssl", "true").option("es.net.ssl.cert.allow.self.signed", "true").option("mergeSchema", "true").option("es.index.auto.create", "true").option("es.nodes","https://xyz.eu-west-1.es.amazonaws.com").option("es.port", "443").option("es.batch.write.retry.wait", "100").save("domainname/_doc")```
    
    
    
    
    
    
    
于 2021-05-04T13:20:53.290 回答
0

你可以试试下面的 sparkConfs,

val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.es.index.auto.create", "true")
.set("spark.es.nodes", "yourESaddress")
.set("spark.es.port", "9200")
.set("spark.es.net.http.auth.user","")
.set("spark.es.net.http.auth.pass", "")
.set("spark.es.resource", indexName)
.set("spark.es.nodes.wan.only", "true")

你仍然面临问题,es.net.ssl = true然后看看。

如果仍然出现错误,请尝试添加以下配置,

'es.resource' = 'ctrl_rater_resumen_lla/hb',
'es.nodes' = 'localhost',
'es.port' = '9200',
'es.index.auto.create' = 'true',
'es.index.read.missing.as.empty' = 'true',
'es.nodes.discovery'='true',
'es.net.ssl'='false'
'es.nodes.client.only'='false',
'es.nodes.wan.only' = 'true'
'es.net.http.auth.user'='xxxxx',
'es.net.http.auth.pass' = 'xxxxx'
'es.nodes.discovery' = 'false'

于 2020-08-10T12:46:44.810 回答