我在使用特定的 spark 方法时遇到问题,saveAsNewAPIHadoopFile
. 上下文是我正在使用 pyspark,将具有 1k、10k、50k、500k、1m 记录的 RDD 索引到 ElasticSearch (ES) 中。
由于各种原因,Spark 上下文在使用 2gb 驱动程序和单个 2gb 执行程序时的性能非常低下。
直到大约 500k,当我遇到 java 堆大小问题时,我都没有问题。增加到大约4gb spark.driver.memory
,我可以索引更多。但是,它的工作时间是有限制的,我们希望索引超过 500k、1m、5m、20m 的记录。
由于各种原因,也受限于使用 pyspark。瓶颈和断点似乎是一个名为 的火花阶段take at SerDeUtil.scala:233
,无论 RDD 进入多少个分区,它都会下降到一个,我假设这是驱动程序收集分区并准备索引。
现在 - 我想知道是否有一种有效的方法仍然使用如下方法,考虑到该约束:
to_index_rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={
"es.resource":"%s/record" % index_name,
"es.nodes":"192.168.45.10:9200",
"es.mapping.exclude":"temp_id",
"es.mapping.id":"temp_id",
}
)
为了寻求好的解决方案,我还不如晾一些脏衣服。我有一个非常低效的解决方法,用于zipWithIndex
分块 RDD,并将这些子集发送到上面的索引函数。看起来有点像这样:
def index_chunks_to_es(spark=None, job=None, kwargs=None, rdd=None, chunk_size_limit=10000):
# zip with index
zrdd = rdd.zipWithIndex()
# get count
job.update_record_count(save=False)
count = job.record_count
# determine number of chunks
steps = count / chunk_size_limit
if steps % 1 != 0:
steps = int(steps) + 1
# evenly distribute chunks, while not exceeding chunk_limit
dist_chunk_size = int(count / steps) + 1
# loop through steps, appending subset to list for return
for step in range(0, steps):
# determine bounds
lower_bound = step * dist_chunk_size
upper_bound = (step + 1) * dist_chunk_size
print(lower_bound, upper_bound)
# select subset
rdd_subset = zrdd.filter(lambda x: x[1] >= lower_bound and x[1] < upper_bound).map(lambda x: x[0])
# index to ElasticSearch
ESIndex.index_job_to_es_spark(
spark,
job=job,
records_df=rdd_subset.toDF(),
index_mapper=kwargs['index_mapper']
)
如果我理解正确的话,它会很慢,因为zipWithIndex
,filter
和map
会针对每个生成的 RDD 子集进行评估。 但是,它的内存效率也很高,因为 500k、1m、5m 等记录永远不会被发送到saveAsNewAPIHadoopFile
这些较小的 RDD,相对较小的火花驱动器可以处理。
对于不同方法的任何建议将不胜感激。也许这意味着现在使用Elasticsearch-Hadoop
连接器,而是将原始 JSON 发送到 ES?
更新:
看起来我仍然在使用此解决方法时遇到 java 堆空间错误,但离开这里是为了演示对可能解决方法的思考。没想到zipWithIndex
会收集驱动程序上的所有内容(我假设这里就是这种情况)
更新#2
这是我尝试运行的 RDD 的调试字符串saveAsNewAPIHadoopFile
:
(32) PythonRDD[6] at RDD at PythonRDD.scala:48 []
| MapPartitionsRDD[5] at javaToPython at NativeMethodAccessorImpl.java:-2 []
| MapPartitionsRDD[4] at javaToPython at NativeMethodAccessorImpl.java:-2 []
| ShuffledRowRDD[3] at javaToPython at NativeMethodAccessorImpl.java:-2 []
+-(1) MapPartitionsRDD[2] at javaToPython at NativeMethodAccessorImpl.java:-2 []
| MapPartitionsRDD[1] at javaToPython at NativeMethodAccessorImpl.java:-2 []
| JDBCRDD[0] at javaToPython at NativeMethodAccessorImpl.java:-2 []
更新#3
下面是take at SerDeUtil.scala:233
似乎运行的 DAG 可视化driver/localhost
:
还有一个 DAG 用于saveAsNewAPIHadoopFile
更小的工作(大约 1k 行),因为 500k 行的尝试从未真正触发,因为SerDeUtil
上面的阶段似乎触发了较大 RDD 的 java 堆大小问题: