这是我第一次真正尝试 spark/scala,所以要温柔。
我在 HDFS 上有一个名为 test.json 的文件,我正在尝试使用 Spark 读取和索引该文件。我可以通过 SQLContext.jsonFile() 读取文件,但是当我尝试使用 SchemaRDD.saveToEs() 时,我收到了一个无效的 JSON 片段接收错误。我在想 saveToES() 函数实际上并没有格式化 json 中的输出,而只是发送 RDD 的值字段。
我究竟做错了什么?
火花 1.2.0
Elasticsearch-hadoop 2.1.0.BUILD-20150217
测试.json:
{"key":"value"}
火花壳:
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val input = sqlContext.jsonFile("hdfs://nameservice1/user/mshirley/test.json")
input.saveToEs("mshirley_spark_test/test")
错误:
<snip>
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[["value"]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=13, length=9): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 34, 118, 97, 108, 117, 101, 34, 93, 10]]; ]]; Bailing out..
<snip>
输入:
res2: org.apache.spark.sql.SchemaRDD =
SchemaRDD[6] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD [key#0], MappedRDD[5] at map at JsonRDD.scala:47
输入.printSchema():
root
|-- key: string (nullable = true)