我正在尝试对 2500 万个整数进行排序。但是当我尝试使用时collect()
,它给了我一个OutofMemory Error: Java Heap Space
错误。下面是源代码:
sc = SparkContext("local", "pyspark")
numbers = sc.textFile("path of text file")
counts = numbers.flatMap(lambda x: x.split()).map(lambda x: (int(x), 1)).sortByKey(lambda x:x)
num_list = []
for (num, count) in counts.collect():
num_list.append(num)
我哪里错了?文本文件大小为 147MB。所有的设置都是默认的。我正在使用 Spark v0.9.0。
编辑:包含 250 万个整数的 Works 文件。但问题从 500 万开始。还用 1000 万次测试并得到相同的 OME 错误。
这是堆栈跟踪:
14/02/06 22:44:31 ERROR Executor: Exception in task ID 5
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2798)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:111)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1870)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1779)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1186)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:28)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:48)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:679)
14/02/06 22:44:31 WARN TaskSetManager: Lost TID 5 (task 0.0:0)
14/02/06 22:44:31 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2798)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:111)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1870)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1779)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1186)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:28)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:48)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:679)
14/02/06 22:44:31 ERROR TaskSetManager: Task 0.0:0 failed 1 times; aborting job
14/02/06 22:44:31 INFO TaskSchedulerImpl: Remove TaskSet 0.0 from pool
14/02/06 22:44:31 INFO DAGScheduler: Failed to run collect at <ipython-input-7-cf9439751c70>:1