0

我在 GCP 中启动了一个 DataProc 集群,有一个主节点和 3 个工作节点。每个节点有 8 个 vCPU 和 30G 内存。

我开发了一个 pyspark 代码,它从 GCS 读取一个 csv 文件。csv 文件大小约为 30G。

df_raw = (
    spark
        .read
        .schema(schema)
        .option('header', 'true')
        .option('quote', '"')
        .option('multiline', 'true')
        .csv(infile)
)
df_raw = df_raw.repartition(20, "Product")
print(df_raw.rdd.getNumPartitions())

这是我将 pyspark 启动到 dataproc 中的方式:

gcloud dataproc jobs submit pyspark gs://<my-gcs-bucket>/<my-program>.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \

我得到的分区号只有 1。

我在此处附上了节点使用情况图片供您参考。 在此处输入图像描述

似乎它只使用了来自一个工作节点的一个 vCore。

如何使其与多个分区并行并使用所有节点和更多 vCore?

尝试重新分区到 20,但它仍然只使用了一个工作节点的一个 vCore,如下所示:

在此处输入图像描述

Pyspark 默认分区是 200。所以我很惊讶地看到 dataproc 没有将所有可用资源用于此类任务。

4

2 回答 2

0

您可能想尝试通过--propertiesDataproc 命令行传递 Spark 配置来增加执行器的数量。所以像

gcloud dataproc jobs submit pyspark gs://<my-gcs-bucket>/<my-program>.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    --properties=spark.executor.instances=5
于 2021-05-22T04:09:57.157 回答
0

这不是数据处理问题,而是纯 Spark/pyspark 问题。

为了并行化您的数据,它需要分成多个分区 - 一个大于您拥有的执行程序(总工作核心)数量的数字。(例如 ~ *2, ~ *3, ...)

有多种方法可以做到这一点,例如:

  1. 将数据拆分为文件或文件夹并并行化文件/文件夹列表并处理每个文件(或使用已经这样做的数据库并将此分区保持在 Spark 读取中)。

  2. 获得 Spark DF 后重新分区数据,例如读取执行程序的数量并将它们乘以 N 并重新分区到这么多分区。当您执行此操作时,您必须选择将您的数据很好地划分为多个部分的列,而不是仅划分为几个部分,例如按天、按客户 ID,而不是按状态 ID。

df = df.repartition(num_partitions, 'partition_by_col1', 'partition_by_col2')

代码在主节点上运行,并行阶段分布在工作节点之间,例如

df = (
    df.withColumn(...).select(...)...
    .write(...)
)

由于 Spark 函数是惰性的,因此它们仅在您执行诸如 write 或 collect 之类的导致评估 DF 的步骤时运行。

于 2021-05-02T18:07:46.827 回答