0

这是我第一次真正尝试 spark/scala,所以要温柔。

我在 HDFS 上有一个名为 test.json 的文件,我正在尝试使用 Spark 读取和索引该文件。我可以通过 SQLContext.jsonFile() 读取文件,但是当我尝试使用 SchemaRDD.saveToEs() 时,我收到了一个无效的 JSON 片段接收错误。我在想 saveToES() 函数实际上并没有格式化 json 中的输出,而只是发送 RDD 的值字段。

我究竟做错了什么?

火花 1.2.0

Elasticsearch-hadoop 2.1.0.BUILD-20150217

测试.json:

{"key":"value"}

火花壳:

import org.apache.spark.SparkContext._
import org.elasticsearch.spark._

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

val input = sqlContext.jsonFile("hdfs://nameservice1/user/mshirley/test.json")
input.saveToEs("mshirley_spark_test/test")

错误:

<snip>
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[["value"]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=13, length=9): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 34, 118, 97, 108, 117, 101, 34, 93, 10]]; ]]; Bailing out..
<snip>

输入:

res2: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[6] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD [key#0], MappedRDD[5] at map at JsonRDD.scala:47

输入.printSchema():

root
 |-- key: string (nullable = true)
4

1 回答 1

2

https://github.com/elastic/elasticsearch-hadoop/issues/382

改变:

import org.elasticsearch.spark._

至:

import org.elasticsearch.spark.sql._
于 2015-07-30T23:36:53.357 回答