我正在尝试在处理从谷歌云存储读取的大量数据(2TB )的纱线模式下运行作业。
管道可以总结如下:
sc.textFile("gs://path/*.json")\
.map(lambda row: json.loads(row))\
.map(toKvPair)\
.groupByKey().take(10)
[...] later processing on collections and output to GCS.
This computation over the elements of collections is not associative,
each element is sorted in it's keyspace.
在10GB数据上运行时,它可以毫无问题地完成。但是,当我在完整数据集上运行它时,它总是失败,并在容器中记录此日志:
15/11/04 16:08:07 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: xxxxxxxxxxx
15/11/04 16:08:07 ERROR org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/rdd.py", line 1299, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/usr/lib/spark/python/pyspark/context.py", line 916, in runJob
15/11/04 16:08:07 WARN org.apache.spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
我尝试通过连接到主服务器来逐个启动每个操作进行调查,但它似乎在groupBy上失败了。我还尝试通过添加节点并升级它们的内存和 CPU 数量来重新调整集群,但我仍然遇到同样的问题。
120 个节点 + 1 个具有相同规格的主节点:8 个 vCPU - 52GB 内存
我试图找到有类似问题的线程但没有成功,所以我真的不知道我应该提供什么信息,因为日志不是很清楚,所以请随时询问更多信息。
主键是每条记录的必需值,我们需要所有没有过滤器的键,大约代表 600k 键。真的可以在不将集群扩展到大规模的情况下执行此操作吗?我刚刚读到 databricks 对 100TB 的数据(https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html)进行了排序,这也涉及到大规模的洗牌。他们成功地将多个内存缓冲区替换为单个缓冲区,从而导致大量磁盘 IO ?我的集群规模是否可以执行此类操作?