1

我在使用特定的 spark 方法时遇到问题,saveAsNewAPIHadoopFile. 上下文是我正在使用 pyspark,将具有 1k、10k、50k、500k、1m 记录的 RDD 索引到 ElasticSearch (ES) 中。

由于各种原因,Spark 上下文在使用 2gb 驱动程序和单个 2gb 执行程序时的性能非常低下。

直到大约 500k,当我遇到 java 堆大小问题时,我都没有问题。增加到大约4gb spark.driver.memory,我可以索引更多。但是,它的工作时间是有限制的,我们希望索引超过 500k、1m、5m、20m 的记录。

由于各种原因,也受限于使用 pyspark。瓶颈和断点似乎是一个名为 的火花阶段take at SerDeUtil.scala:233,无论 RDD 进入多少个分区,它都会下降到一个,我假设这是驱动程序收集分区并准备索引。

现在 - 我想知道是否有一种有效的方法仍然使用如下方法,考虑到该约束:

to_index_rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf={
        "es.resource":"%s/record" % index_name,
        "es.nodes":"192.168.45.10:9200",
        "es.mapping.exclude":"temp_id",
        "es.mapping.id":"temp_id",
    }
)

为了寻求好的解决方案,我还不如晾一些脏衣服。我有一个非常低效的解决方法,用于zipWithIndex分块 RDD,并将这些子集发送到上面的索引函数。看起来有点像这样:

def index_chunks_to_es(spark=None, job=None, kwargs=None, rdd=None, chunk_size_limit=10000):

    # zip with index
    zrdd = rdd.zipWithIndex()

    # get count
    job.update_record_count(save=False)
    count = job.record_count

    # determine number of chunks
    steps = count / chunk_size_limit
    if steps % 1 != 0:
            steps = int(steps) + 1

    # evenly distribute chunks, while not exceeding chunk_limit
    dist_chunk_size = int(count / steps) + 1

    # loop through steps, appending subset to list for return
    for step in range(0, steps):

        # determine bounds
        lower_bound = step * dist_chunk_size
        upper_bound = (step + 1) * dist_chunk_size
        print(lower_bound, upper_bound)

        # select subset
        rdd_subset = zrdd.filter(lambda x: x[1] >= lower_bound and x[1] < upper_bound).map(lambda x: x[0])

        # index to ElasticSearch
        ESIndex.index_job_to_es_spark(
            spark,
            job=job,
            records_df=rdd_subset.toDF(),
            index_mapper=kwargs['index_mapper']
        )

如果我理解正确的话,它会很慢,因为zipWithIndex,filtermap会针对每个生成的 RDD 子集进行评估。 但是,它的内存效率也很高,因为 500k、1m、5m 等记录永远不会被发送到saveAsNewAPIHadoopFile这些较小的 RDD,相对较小的火花驱动器可以处理。

对于不同方法的任何建议将不胜感激。也许这意味着现在使用Elasticsearch-Hadoop连接器,而是将原始 JSON 发送到 ES?

更新:

看起来我仍然在使用此解决方法时遇到 java 堆空间错误,但离开这里是为了演示对可能解决方法的思考。没想到zipWithIndex会收集驱动程序上的所有内容(我假设这里就是这种情况)

更新#2

这是我尝试运行的 RDD 的调试字符串saveAsNewAPIHadoopFile

(32) PythonRDD[6] at RDD at PythonRDD.scala:48 []
 |   MapPartitionsRDD[5] at javaToPython at NativeMethodAccessorImpl.java:-2 []
 |   MapPartitionsRDD[4] at javaToPython at NativeMethodAccessorImpl.java:-2 []
 |   ShuffledRowRDD[3] at javaToPython at NativeMethodAccessorImpl.java:-2 []
 +-(1) MapPartitionsRDD[2] at javaToPython at NativeMethodAccessorImpl.java:-2 []
    |  MapPartitionsRDD[1] at javaToPython at NativeMethodAccessorImpl.java:-2 []
    |  JDBCRDD[0] at javaToPython at NativeMethodAccessorImpl.java:-2 []

更新#3

下面是take at SerDeUtil.scala:233似乎运行的 DAG 可视化driver/localhost

在此处输入图像描述

还有一个 DAG 用于saveAsNewAPIHadoopFile更小的工作(大约 1k 行),因为 500k 行的尝试从未真正触发,因为SerDeUtil上面的阶段似乎触发了较大 RDD 的 java 堆大小问题:

在此处输入图像描述

4

1 回答 1

1

对于为什么这解决了问题,我仍然有点困惑,但确实如此。当从 MySQL 读取行时spark.jdbc.read,通过传递边界,生成的 RDD 似乎以saveAsNewAPIHadoopFile适合大型 RDD 的方式进行分区。

有一个用于 DB 行的 Django 模型,因此可以获得第一列和最后一列 ID:

records = records.order_by('id')
start_id = records.first().id
end_id = records.last().id

然后,将它们传递给spark.read.jdbc

sqldf = spark.read.jdbc(
    settings.COMBINE_DATABASE['jdbc_url'],
    'core_record',
    properties=settings.COMBINE_DATABASE,
    column='id',
    lowerBound=bounds['lowerBound'],
    upperBound=bounds['upperBound'],
    numPartitions=settings.SPARK_REPARTITION
)

RDD 的调试字符串显示原始 RDD 现在有10分区:

(32) PythonRDD[11] at RDD at PythonRDD.scala:48 []
 |   MapPartitionsRDD[10] at javaToPython at NativeMethodAccessorImpl.java:-2 []
 |   MapPartitionsRDD[9] at javaToPython at NativeMethodAccessorImpl.java:-2 []
 |   ShuffledRowRDD[8] at javaToPython at NativeMethodAccessorImpl.java:-2 []
 +-(10) MapPartitionsRDD[7] at javaToPython at NativeMethodAccessorImpl.java:-2 []
    |   MapPartitionsRDD[6] at javaToPython at NativeMethodAccessorImpl.java:-2 []
    |   JDBCRDD[5] at javaToPython at NativeMethodAccessorImpl.java:-2 []

我的理解崩溃的地方是,您可以看到32在问题的调试字符串和上面的这个字符串中都有手动/显式重新分区,我认为这足以缓解saveAsNewAPIHadoopFile调用的内存压力,但显然来自原始spark.jdbc.read事务甚至下游的 Dataframe(变成 RDD) 。

于 2017-12-01T16:58:13.253 回答