我是 spark 新手,目前正在测试 pySpark。它的运行速度比我预期的要慢,我想知道我是否设置正确。
我的问题:
我有一个由 57 个分区组成的 RDD(每个分区约 30Mb),所有分区都被缓存(内存中的总大小为 1700MB)。RDD 包含 13M 个字符串,每个字符串约 300 个字符。所以一般来说不是大数据集。那么为什么运行 count() 需要 4 秒?
我检查了 UI,似乎对于“计数”作业,它运行 57 个任务(如预期的那样),每个任务需要 0.6 秒,这对我来说似乎很慢。
我在谷歌云上运行,在 Mesos 之上,有 1 个主节点和 2 个从节点。每个实例有 8 个内核和 30 GB 的 RAM。
我的问题:
每个任务 0.6 秒有意义吗?
根据 UI,每个执行器花费 18 秒运行任务。假设每个节点有 8 个核心,这应该需要 2.25 秒。那么我们是如何在最后达到 4 秒的呢?
编码:
import time
GCS_CONNECTOR_HADOOP_CONF = {
'fs.gs.impl': 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem',
'fs.gs.project.id': 'xxx',
'fs.gs.system.bucket': 'xxx',
'fs.gs.working.dir': 'spark',
'fs.gs.auth.service.account.email': 'xxx',
'fs.gs.auth.service.account.keyfile': 'xxxx'
}
def get_rdd_from_gcs_uris(spark_context,
gcs_uris,
hadoop_conf,
input_format='org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
key_type='org.apache.hadoop.io.LongWritable',
value_type='org.apache.hadoop.io.Text',
key_converter=None):
rdds = []
for gcs_uri in gcs_uris:
rdd = spark_context.newAPIHadoopFile(gcs_uri,
input_format,
key_type,
value_type,
keyConverter=key_converter,
conf=hadoop_conf).cache()
# we only care about the values, the keys are the byte offsets of each value
rdd = rdd.map(lambda x: x[1])
rdds.append(rdd)
return spark_context.union(rdds)
#Reading files from GCS (I'm reading 6 files)
rdd = get_rdd_from_gcs_uris(sc, gcs_uris, GCS_CONNECTOR_HADOOP_CONF).cache()
#Counting lines for the first time. This is suppose to be slow
rdd.count()
#Counting lines for the second time. It's 10x faster than the first time, but it takes 4 seconds
tic = time.time()
rdd.count()
print('Count took %.2f seconds' % ((time.time() - tic) / 1000))