1

我是 spark 新手,目前正在测试 pySpark。它的运行速度比我预期的要慢,我想知道我是否设置正确。

我的问题:

我有一个由 57 个分区组成的 RDD(每个分区约 30Mb),所有分区都被缓存(内存中的总大小为 1700MB)。RDD 包含 13M 个字符串,每个字符串约 300 个字符。所以一般来说不是大数据集。那么为什么运行 count() 需要 4 秒?

我检查了 UI,似乎对于“计数”作业,它运行 57 个任务(如预期的那样),每个任务需要 0.6 秒,这对我来说似乎很慢。

我在谷歌云上运行,在 Mesos 之上,有 1 个主节点和 2 个从节点。每个实例有 8 个内核和 30 GB 的 RAM。

我的问题:

  1. 每个任务 0.6 秒有意义吗?

  2. 根据 UI,每个执行器花费 18 秒运行任务。假设每个节点有 8 个核心,这应该需要 2.25 秒。那么我们是如何在最后达到 4 秒的呢?

编码:

import time
GCS_CONNECTOR_HADOOP_CONF = {
    'fs.gs.impl': 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem',
    'fs.gs.project.id': 'xxx',
    'fs.gs.system.bucket': 'xxx',
    'fs.gs.working.dir': 'spark',
    'fs.gs.auth.service.account.email': 'xxx',
    'fs.gs.auth.service.account.keyfile': 'xxxx'
}


def get_rdd_from_gcs_uris(spark_context,
                          gcs_uris,
                          hadoop_conf,
                          input_format='org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
                          key_type='org.apache.hadoop.io.LongWritable',
                          value_type='org.apache.hadoop.io.Text',
                          key_converter=None):

    rdds = []
    for gcs_uri in gcs_uris:

        rdd = spark_context.newAPIHadoopFile(gcs_uri,
                                             input_format,
                                             key_type,
                                             value_type,
                                             keyConverter=key_converter,
                                             conf=hadoop_conf).cache()
        # we only care about the values, the keys are the byte offsets of each value
        rdd = rdd.map(lambda x: x[1])
        rdds.append(rdd)
    return spark_context.union(rdds)

#Reading files from GCS (I'm reading 6 files)
rdd = get_rdd_from_gcs_uris(sc, gcs_uris, GCS_CONNECTOR_HADOOP_CONF).cache()

#Counting lines for the first time. This is suppose to be slow
rdd.count()

#Counting lines for the second time. It's 10x faster than the first time, but it takes 4 seconds
tic = time.time()
rdd.count()
print('Count took %.2f seconds' % ((time.time() - tic) / 1000))
4

1 回答 1

3

提示:

  1. 使用 Scala(或 Java)而不是 Python。我对此没有引用,但似乎常识是,将两种语言连接起来会增加效率。每个执行器将运行一个 Python 进程并通过管道与其通信。

  2. 不要unionRDD。您可以将“glob”(例如path/*.csv)传递给newAPIHadoopFile它,它会返回一个由所有匹配的文件组成的 RDD。(但这不应该count在缓存后影响。)

  3. 在 Spark UI 的 Storage 选项卡上,检查 RDD 的哪一部分被缓存。也许不是100%。

  4. 不要测量秒数。处理更多数据并测量分钟数。一个 JVM 可以花费 4 秒来执行 GC。

  5. 尝试使用更多或更少的分区。你有 57 个分区和 16 个执行线程。所以每个执行器线程将不得不多次请求更多的工作。用 16 个分区试试,这样他们只需要问一次。

于 2015-04-04T11:03:39.920 回答