我正在使用 pyspark (1.6) 和 elasticsearch-hadoop (5.1.1)。我通过以下方式将我的数据从 elasticsearch 转换为 rdd 格式:
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
这里的 es_read_conf 只是我的 ES 集群的字典,作为 sc 的 SparkContext 对象。这很好用,我得到了 rdd 对象。
我想将其转换为数据框
df = es_rdd.toDF()
但我得到了错误:
ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
给 toDF 方法一个 sampleSize 会导致同样的错误。据我了解,这是因为 pyspark 无法确定每个字段的类型。我知道我的弹性搜索集群中有一些字段都是空的。
将其转换为数据框的最佳方法是什么?