我对我正在尝试使用的大数据技术非常陌生,但到目前为止,我已经设法在 RStudio 中设置 sparklyr 以连接到独立的 Spark 集群。数据存储在 Cassandra 中,我可以成功地将大型数据集带入 Spark 内存(缓存)以对其进行进一步分析。
然而,最近我在将一个特别大的数据集导入 Spark 内存时遇到了很多麻烦,即使集群应该有足够的资源(60 个内核,200GB RAM)来处理其大小的数据集。
我认为通过将缓存的数据限制为仅选择几个感兴趣的列,我可以克服这个问题(使用我之前查询的答案代码here),但事实并非如此。发生的情况是我本地机器上的 jar 进程加速以占用所有本地 RAM 和 CPU 资源,整个进程冻结,并且集群上的执行器不断被删除和重新添加。奇怪的是,即使我只选择 1 行进行缓存,也会发生这种情况(这应该使这个数据集比其他我在缓存到 Spark 内存中没有问题的数据集小得多)。
我查看了日志,这些似乎是流程早期唯一的信息错误/警告:
17/03/06 11:40:27 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 33813 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) or its executor has been marked as failed.
17/03/06 11:40:27 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 8167), so marking it as still running
...
17/03/06 11:46:59 WARN TaskSetManager: Lost task 3927.3 in stage 0.0 (TID 54882, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 3863), so marking it as still running
17/03/06 11:46:59 WARN TaskSetManager: Lost task 4300.3 in stage 0.0 (TID 54667, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 14069), so marking it as still running
然后在 20 分钟左右后,整个工作崩溃了:
java.lang.OutOfMemoryError: GC overhead limit exceeded
我已经更改了连接配置以增加心跳spark.executor.heartbeatInterval: '180s'
间隔spark.yarn.executor.memoryOverhead
(
在我的配置文件中,我通过一次添加以下每个设置进行了实验(均无效):
spark.memory.fraction: 0.3
spark.executor.extraJavaOptions: '-Xmx24g'
spark.driver.memory: "64G"
spark.driver.extraJavaOptions: '-XX:MaxHeapSize=1024m'
spark.driver.extraJavaOptions: '-XX:+UseG1GC'
更新:我当前的完整yml
配置文件如下:
default:
# local settings
sparklyr.sanitize.column.names: TRUE
sparklyr.cores.local: 3
sparklyr.shell.driver-memory: "8G"
# remote core/memory settings
spark.executor.memory: "32G"
spark.executor.cores: 5
spark.executor.heartbeatInterval: '180s'
spark.ext.h2o.nthreads: 10
spark.cores.max: 30
spark.memory.storageFraction: 0.6
spark.memory.fraction: 0.3
spark.network.timeout: 300
spark.driver.extraJavaOptions: '-XX:+UseG1GC'
# other configs for spark
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.executor.extraClassPath: /var/lib/cassandra/jar/guava-18.0.jar
# cassandra settings
spark.cassandra.connection.host: <cassandra_ip>
spark.cassandra.auth.username: <cassandra_login>
spark.cassandra.auth.password: <cassandra_pass>
spark.cassandra.connection.keep_alive_ms: 60000
# spark packages to load
sparklyr.defaultPackages:
- "com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M1"
- "com.databricks:spark-csv_2.11:1.3.0"
- "com.datastax.cassandra:cassandra-driver-core:3.0.2"
- "com.amazonaws:aws-java-sdk-pom:1.10.34"
所以我的问题是:
- 有人对在这种情况下该怎么做有任何想法吗?
是 - 我可以更改配置设置以帮助解决此问题吗?
- 或者,有没有办法以 RStudio/sparklyr 作为驱动程序批量导入 cassandra 数据?
- 或者,有没有办法在数据被带入缓存时对数据进行处理/过滤/编辑,以使结果表更小(类似于使用 SQL 查询,但使用更复杂的 dplyr 语法)?