23

我在 PySpark 中运行一些操作,最近增加了我的配置(在 Amazon EMR 上)中的节点数量。然而,即使我将节点数量增加了两倍(从 4 个到 12 个),性能似乎并没有改变。因此,我想看看新节点是否对 Spark 可见。

我正在调用以下函数:

sc.defaultParallelism
>>>> 2

但我认为这告诉我分配给每个节点的任务总数,而不是 Spark 可以看到的节点总数。

如何查看 PySpark 在集群中使用的节点数量?

4

5 回答 5

27

在 pyspark 上,您仍然可以getExecutorMemoryStatus使用 pyspark 的 py4j 桥接器调用 scala API:

sc._jsc.sc().getExecutorMemoryStatus().size()
于 2017-02-06T09:40:37.233 回答
19

sc.defaultParallelism只是一个提示。根据配置,它可能与节点数量无关。如果您使用带有分区计数参数但您不提供它的操作,则这是分区数。例如sc.parallelize将从列表中创建一个新的 RDD。您可以使用第二个参数告诉它要在 RDD 中创建多少个分区。但此参数的默认值为sc.defaultParallelism.

您可以sc.getExecutorMemoryStatus在 Scala API 中获取执行器的数量,但这不会在 Python API 中公开。

一般来说,建议是 RDD 中的分区数量大约是执行程序数量的 4 倍。这是一个很好的提示,因为如果任务花费的时间有差异,这将平衡它。例如,一些执行器将处理 5 个更快的任务,而其他执行器将处理 3 个较慢的任务。

您不需要对此非常准确。如果您有一个粗略的想法,您可以进行估算。就像如果你知道你有少于 200 个 CPU,你可以说 500 个分区就可以了。

所以尝试用这个数量的分区创建 RDD:

rdd = sc.parallelize(data, 500)     # If distributing local data.
rdd = sc.textFile('file.csv', 500)  # If loading data from a file.

如果您不控制 RDD 的创建,或者在计算之前重新分区 RDD:

rdd = rdd.repartition(500)

您可以使用 . 检查 RDD 中的分区数rdd.getNumPartitions()

于 2015-03-01T12:15:25.533 回答
5

应该可以使用此方法获取集群中的节点数(类似于上面@Dan 的方法,但更短且效果更好!)。

sc._jsc.sc().getExecutorMemoryStatus().keySet().size()
于 2018-01-05T10:14:58.633 回答
3

其他答案提供了一种获取执行者数量的方法。这是一种获取节点数量的方法。这包括头节点和工作节点。

s = sc._jsc.sc().getExecutorMemoryStatus().keys()
l = str(s).replace("Set(","").replace(")","").split(", ")

d = set()
for i in l:
    d.add(i.split(":")[0])
len(d)  
于 2017-10-31T18:51:55.080 回答
1

我发现有时我的会话被远程杀死,给出一个奇怪的 Java 错误

Py4JJavaError: An error occurred while calling o349.defaultMinPartitions.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

我通过以下方式避免了这种情况

def check_alive(spark_conn):
    """Check if connection is alive. ``True`` if alive, ``False`` if not"""
    try:
        get_java_obj = spark_conn._jsc.sc().getExecutorMemoryStatus()
        return True
    except Exception:
        return False

def get_number_of_executors(spark_conn):
    if not check_alive(spark_conn):
        raise Exception('Unexpected Error: Spark Session has been killed')
    try:
        return spark_conn._jsc.sc().getExecutorMemoryStatus().size()
    except:
        raise Exception('Unknown error')
于 2017-07-04T18:24:57.600 回答