3

当我尝试使用esRDD("index")elasticsearch-spark 中的函数从 elasticsearch 读取数据时,我得到 type 中的结果org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])]。当我检查值时,它们都是 type AnyRef。但是,我在ES 网站上看到,它说:

elasticsearch-hadoop 自动将 Spark 内置类型转换为 Elasticsearch 类型(并返回)

我的依赖是:

scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"  
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0"  
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.1.0"  
libraryDependencies += "org.elasticsearch" % "elasticsearch-spark-20_2.11" % "5.4.0"

我错过了什么吗?以及如何以方便的方式转换类型?

4

1 回答 1

1

好的,我找到了解决方案。如果使用esRDD,所有类型信息都会丢失。
我们最好使用:

val df = sparkSession.read.format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include", "").load("index")

你可以在里面配置option,如果你之前做过,option可以忽略。

数据返回在 中DataFramesql.DataTypes只要转换受elasticsearch-spark.

现在你可以做任何你想做的事。

于 2017-07-06T09:44:27.533 回答