1

在比较 pyspark 3.2.1 中提供在 Spark Dataframe 的分组结果上运行 pandas UDF 的能力的两个 API 时,我遇到了奇怪的性能结果:

首先,我在本地火花模式 (Spark 3.2.1) 下运行以下输入生成器代码:

import pyspark.sql.types as types
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
import pyspark.pandas as ps

spark = SparkSession.builder \
    .config("spark.sql.execution.arrow.pyspark.enabled", True) \
    .getOrCreate()

ps.set_option("compute.default_index_type", "distributed")

spark.range(1000000).withColumn('group', (col('id') / 10).cast('int')) \
    .write.parquet('/tmp/sample_input', mode='overwrite')

然后我测试applyInPandas

def getsum(pdf):
    pdf['sum_in_group'] = pdf['id'].sum()
    return pdf

df = spark.read.parquet(f'/tmp/sample_input')
output_schema = types.StructType(
    df.schema.fields + [types.StructField('sum_in_group', types.FloatType())]
)
df.groupBy('group').applyInPandas(getsum, schema=output_schema) \
    .write.parquet('/tmp/schematest', mode='overwrite')

并且代码在 30 秒内执行(在 i7-9750H CPU 上)

然后,我尝试了新的 API 并且 - 虽然我真的很欣赏代码看起来多么漂亮:

def getsum(pdf) -> ps.DataFrame["id": int, "group": int, "sum_in_group": int]:
    pdf['sum_in_group'] = pdf['id'].sum()
    return pdf

df = ps.read_parquet(f'/tmp/sample_input')
df.groupby('group').apply(getsum) \
    .to_parquet('/tmp/schematest', mode='overwrite')

...每次在同一个 CPU 上执行时间至少为 1m 40s ,因此对于这个简单的操作来说,速度要慢 3 倍以上。

我知道添加sum_in_group可以在没有 panadas 参与的情况下更有效地完成,但这只是为了提供一个小的最小示例。任何其他操作也至少慢 3 倍。

你知道这种放缓的原因是什么吗?也许我缺少一些可以使这些在相似时间执行的上下文参数?

4

0 回答 0