2

我正在使用Spark (2.2.0)ElasticSeach Hadoop (7.6.0) 的 Spark Job 的目的是处理 parquet 文件中的记录,并将其附加到 ElasticSearch 中已经存在的文档中。由于 ElasticSearch 不支持更新,因此获取记录和更新记录的部分由作业处理。

我大约20 million records在索引中。在任何时候,我都不需要所有记录,因此我使用过滤器下推来仅获取所需数量的文档。

出于性能原因,您可以下推的最大记录词数是65536. 我提出了100K但没有进一步移动它,因为获取的平均记录数介于2-3 million.

因此,目标是创建数据帧,100K每个请求获取记录并使用union

我的部分代码如下

val df = sparkSession
      .sqlContext
      .read
      .format("es")
      .load(index)

val CURSOR_SIZE = 100000
val cursors = filter._2.grouped(CURSOR_SIZE)
    cursors.map(cursor => df.filter($"${filter._1}".isin(cursor:_*))).reduce(_ union _)

使用上面的代码,Spark UI 在函数完成后卡住了,没有启动任何任务collect(),直到我收到 OOM 错误。 Spark UI 卡住了

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
    at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
    at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
    at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1883)
    at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1792)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1190)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:136)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)

为了尝试使用 RDD 进行并行化,我尝试了以下方法:

session
 .sparkContext
 .parallelize(cursors)
 .map(cursor => df.filter($"${filter._1}".isin(cursor:_*)))
 .reduce(_ union _)

抛出一个NullPointerException

我理解第二种方法的问题,因为DataFrameRDD是抽象的驱动程序概念,因此执行程序无法对它们进行操作。

但是在尝试了所有这些之后,我没有想法,还有什么可以尝试的。如果有人能指出我正确的方向,我将不胜感激。

谢谢 !!

更新:更新了代码片段以更密切地关注问题。

4

0 回答 0