我正在使用Spark (2.2.0)
我ElasticSeach Hadoop (7.6.0)
的 Spark Job 的目的是处理 parquet 文件中的记录,并将其附加到 ElasticSearch 中已经存在的文档中。由于 ElasticSearch 不支持更新,因此获取记录和更新记录的部分由作业处理。
我大约20 million records
在索引中。在任何时候,我都不需要所有记录,因此我使用过滤器下推来仅获取所需数量的文档。
出于性能原因,您可以下推的最大记录词数是65536
. 我提出了100K
但没有进一步移动它,因为获取的平均记录数介于2-3 million
.
因此,目标是创建数据帧,100K
每个请求获取记录并使用union
我的部分代码如下
val df = sparkSession
.sqlContext
.read
.format("es")
.load(index)
val CURSOR_SIZE = 100000
val cursors = filter._2.grouped(CURSOR_SIZE)
cursors.map(cursor => df.filter($"${filter._1}".isin(cursor:_*))).reduce(_ union _)
使用上面的代码,Spark UI 在函数完成后卡住了,没有启动任何任务collect()
,直到我收到 OOM 错误。
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1883)
at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1792)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1190)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:136)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
为了尝试使用 RDD 进行并行化,我尝试了以下方法:
session
.sparkContext
.parallelize(cursors)
.map(cursor => df.filter($"${filter._1}".isin(cursor:_*)))
.reduce(_ union _)
抛出一个NullPointerException
我理解第二种方法的问题,因为DataFrame
和RDD
是抽象的驱动程序概念,因此执行程序无法对它们进行操作。
但是在尝试了所有这些之后,我没有想法,还有什么可以尝试的。如果有人能指出我正确的方向,我将不胜感激。
谢谢 !!
更新:更新了代码片段以更密切地关注问题。