16

我正在对集群进行计算,最后当我使用 df.describe().show() 询问我的 Spark 数据帧的摘要统计信息时,我收到一个错误:

序列化任务 15:0 为 137500581 字节,超过了允许的最大值:spark.rpc.message.maxSize(134217728 字节)。考虑增加 spark.rpc.message.maxSize 或对大值使用广播变量

在我的 Spark 配置中,我已经尝试增加上述参数:

spark = (SparkSession
         .builder
         .appName("TV segmentation - dataprep for scoring")
         .config("spark.executor.memory", "25G")
         .config("spark.driver.memory", "40G")
         .config("spark.dynamicAllocation.enabled", "true")
         .config("spark.dynamicAllocation.maxExecutors", "12")
         .config("spark.driver.maxResultSize", "3g")
         .config("spark.kryoserializer.buffer.max.mb", "2047mb")
         .config("spark.rpc.message.maxSize", "1000mb")
         .getOrCreate())

我还尝试使用以下方法重新分区我的数据框:

dfscoring=dfscoring.repartition(100)

但我仍然不断收到同样的错误。

我的环境:Python 3.5、Anaconda 5.0、Spark 2

我怎样才能避免这个错误?

4

3 回答 3

12

我有同样的麻烦,然后我解决它。原因是spark.rpc.message.maxSize如果默认设置128M,您可以在启动 spark 客户端时更改它,我在 pyspark 中工作并将值设置为 1024,所以我这样写:

pyspark --master yarn --conf spark.rpc.message.maxSize=1024

解决这个问题。

于 2019-10-08T03:04:47.037 回答
6

我有同样的问题,它浪费了我生命中的一天,我再也回不来了。我不确定为什么会发生这种情况,但这是我如何让它为我工作的。

第 1 步:确保 PYSPARK_PYTHON 和 PYSPARK_DRIVER_PYTHON 设置正确。 结果发现worker(2.6)中的python与驱动程序(3.6)中的版本不同。您应该检查环境变量 PYSPARK_PYTHON 和 PYSPARK_DRIVER_PYTHON 是否设置正确。

我通过简单地将我的内核从 Python 3 Spark 2.2.0 切换到 Jupyter 中的 Python Spark 2.3.1 来修复它。您可能必须手动设置它。以下是如何确保您的 PySpark 设置正确https://mortada.net/3-easy-steps-to-set-up-pyspark.html

第 2 步:如果这不起作用,请尝试解决它: 这个内核开关适用于我没有添加任何列的 DF:spark_df -> panda_df -> back_to_spark_df .... 但它在我添加了 5 个额外列的 DF。所以我尝试过并且有效的方法如下:

# 1. Select only the new columns: 

    df_write = df[['hotel_id','neg_prob','prob','ipw','auc','brier_score']]


# 2. Convert this DF into Spark DF:



     df_to_spark = spark.createDataFrame(df_write)
     df_to_spark = df_to_spark.repartition(100)
     df_to_spark.registerTempTable('df_to_spark')


# 3. Join it to the rest of your data:

    final = df_to_spark.join(data,'hotel_id')


# 4. Then write the final DF. 

    final.write.saveAsTable('schema_name.table_name',mode='overwrite')

希望有帮助!

于 2019-03-25T13:51:51.200 回答
3

我有同样的问题,但使用 Watson Studio。我的解决方案是:

sc.stop()
configura=SparkConf().set('spark.rpc.message.maxSize','256')
sc=SparkContext.getOrCreate(conf=configura)
spark = SparkSession.builder.getOrCreate()

我希望它可以帮助某人...

于 2019-12-26T10:41:54.593 回答