1

我正在尝试实现 Uber 的 Petastorm 数据集创建,它利用 Spark 按照其Github 页面上的教程创建镶木地板文件。

编码:

spark = SparkSession.builder.config('spark.driver.memory', '10g').master('local[4]').getOrCreate()
sc = spark.sparkContext

with materialize_dataset(spark=spark, dataset_url='file:///opt/data/hello_world_dataset',
                         schema=MySchema, row_group_size_mb=256):

    logging.info('Building RDD...')
    rows_rdd = sc.parallelize(ids)\
        .map(row_generator)\  # Generator that yields lists of examples
        .flatMap(lambda x: dict_to_spark_row(MySchema, x))

    logging.info('Creating DataFrame...')
    spark.createDataFrame(rows_rdd, MySchema.as_spark_schema()) \
        .coalesce(10) \
        .write \
        .mode('overwrite') \
        .parquet('file:///opt/data/hello_world_dataset')

现在 RDD 代码成功执行,但只有.createDataFrame调用失败,并出现以下错误:

_pickle.PicklingError:无法序列化广播:溢出错误:无法序列化大于 4GiB 的字符串

这是我第一次使用 Spark,所以我无法确定这个错误是源自 Spark 还是 Petastorm。

查看此错误的其他解决方案(关于 Spark,而不是 Petastorm),我发现它可能与酸洗协议有关,但我无法确认,我也没有找到改变酸洗协议的方法。

我怎样才能避免这个错误?

4

2 回答 2

2

问题在于在不同进程之间传递数据的酸洗,默认酸洗协议是2,我们需要使用4才能传递大于4GB的对象。

要更改酸洗协议,在创建 Spark 会话之前,请使用以下代码

from pyspark import broadcast
import pickle


def broadcast_dump(self, value, f):
    pickle.dump(value, f, 4)  # was 2, 4 is first protocol supporting >4GB
    f.close()

    return f.name


broadcast.Broadcast.dump = broadcast_dump
于 2018-11-21T10:05:43.777 回答
2

建立bluesummers的答案

spark 的 master 分支现在修复了这个问题,所以我用这个代码以同样的方式修补转储功能,但更安全一点。[用 2.3.2 测试]

from pyspark import broadcast
from pyspark.cloudpickle import print_exec
import pickle

def broadcast_dump(self, value, f):
    try:
        pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) 
    except pickle.PickleError:
        raise
    except Exception as e:
        msg = "Could not serialize broadcast: %s: %s" \
                % (e.__class__.__name__, _exception_message(e))
        print_exec(sys.stderr)
        raise pickle.PicklingError(msg)
    f.close()

broadcast.Broadcast.dump = broadcast_dump
于 2019-02-07T00:18:58.167 回答