4

我不知道如何使用来自 spark 的 python 将数据帧写入 elasticsearch。我按照这里的步骤进行操作。

这是我的代码:

# Read file
df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='true') \
    .load('/vagrant/data/input/input.csv', schema = customSchema)

df.registerTempTable("data")

# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")

es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_conf)

上面的代码给出

引起:net.razorvine.pickle.PickleException:ClassDict 构造的预期零参数(对于 pyspark.sql.types._create_row)

我还从以下位置启动脚本: spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py以确保elasticsearch-hadoop已加载

4

3 回答 3

4

对于初学者来说,saveAsNewAPIHadoopFile期望一RDD(key, value),在您的情况下,这可能只是偶然发生。同样的事情也适用于您声明的值格式。

我对 Elastic 不熟悉,但仅基于您可能应该尝试类似以下的论点:

kpi1.rdd.map(lambda row: (None, row.asDict()).saveAsNewAPIHadoopFile(...)

由于 Elastic-Hadoop 提供 SQL 数据源,您还应该能够跳过它并直接保存数据:

df.write.format("org.elasticsearch.spark.sql").save(...)
于 2016-09-18T20:41:27.627 回答
1

正如 zero323 所说,将 Dataframe 从 PySpark 加载到 Elasticsearch 的最简单方法是使用该方法

Dataframe.write.format("org.elasticsearch.spark.sql").save("index/type")  
于 2017-10-06T10:30:25.270 回答
0

你可以使用这样的东西:

df.write.mode('overwrite').format("org.elasticsearch.spark.sql").option("es.resource", '%s/%s' % (conf['index'], conf['doc_type'])).option("es.nodes", conf['host']).option("es.port", conf['port']).save()
于 2020-11-17T15:44:06.313 回答