0

我有一个很像这样的案例:

示例数据框:

from pyspark.sql.types import *
schema = StructType([  # schema
    StructField("id", StringType(), True),
    StructField("email", ArrayType(StringType()), True)])
df = spark.createDataFrame([{"id": "id1"},
                            {"id": "id2", "email": None},
                            {"id": "id3","email": ["email1@gmail.com"]},
                            {"id": "id4", "email": ["email1@gmail.com", "email2@gmail.com"]}],
                           schema=schema)
df.show(truncate=False)
+---+------------------------------------+
|id |email                               |
+---+------------------------------------+
|id1|null                                |
|id2|null                                |
|id3|[email1@gmail.com]                  |
|id4|[email1@gmail.com, email2@gmail.com]|
+---+------------------------------------+

我想将这些数据插入到 Elasticsearch 中,所以据我研究,我必须转换为索引格式:

def parseTest(r):
    if r['email'] is None:
        return r['id'],{"id":r['id']}
    else:
        return r['id'],{"id":r['id'],"email":r['email']}
df2 = df.rdd.map(lambda row: parseTest(row))
df2.top(4)
[('id4', {'email': ['email1@gmail.com', 'email2@gmail.com'], 'id': 'id4'}),
 ('id3', {'email': ['email1@gmail.com'], 'id': 'id3'}),
 ('id2', {'id': 'id2'}),
 ('id1', {'id': 'id1'})]

然后我尝试插入:

es_conf = {"es.nodes" : "node1.com,node2.com",
           "es.resource": "index/type"}
df2.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_conf)

我明白了:

org.apache.spark.SparkException:无法使用 java.util.ArrayList 类型的数据

Spark v 2.1.0
ES v 2.4.4

如果没有该email字段它可以正常工作,我使用es.output.json: trueand找到了一些建议的解决方案json.dumps,但它似乎适用于版本 5,所以我尝试在另一个集群中使用 ES v5

df3 = df2.map(json.dumps)
df3.top(4)
['["id4", {"email": ["email1@gmail.com", "email2@gmail.com"], "id": "id4"}]',
 '["id3", {"email": ["email1@gmail.com"], "id": "id3"}]',
 '["id2", {"id": "id2"}]',
 '["id1", {"id": "id1"}]']
es_conf2 = {"es.nodes" : "anothernode1.com,anothernode2.com",
           "es.output.json": "true",
           "es.resource": "index/type"}
df3.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_conf2)

然后我得到:

不能使用 java.lang.String 类型的 RDD 元素

Spark v 2.1.0
ES v 5.2.0

感觉坏人

4

1 回答 1

1

write通过使用数据框对象的方法,我找到了另一种方法来完成相同的工作。

因此,在第一节之后:

from pyspark.sql.types import *
schema = StructType([  # schema
    StructField("id", StringType(), True),
    StructField("email", ArrayType(StringType()), True)])
df = spark.createDataFrame([{"id": "id1"},
                            {"id": "id2", "email": None},
                            {"id": "id3","email": ["email1@gmail.com"]},
                            {"id": "id4", "email": ["email1@gmail.com", "email2@gmail.com"]}],
                           schema=schema)
df.show(truncate=False)
+---+------------------------------------+
|id |email                               |
+---+------------------------------------+
|id1|null                                |
|id2|null                                |
|id3|[email1@gmail.com]                  |
|id4|[email1@gmail.com, email2@gmail.com]|
+---+------------------------------------+

你只需要:

df.write\
    .format("org.elasticsearch.spark.sql")\
    .option("es.nodes","node1.com,node2.com")\
    .option("es.resource","index/type")\
    .option("es.mapping.id", "id")\
    .save()

无需转换为 RDD 或以任何方式修改。

于 2017-02-10T16:50:52.480 回答