3

当我尝试使用以下代码使用MLeap序列化模型时:

import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

# Import standard PySpark Transformers and packages
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql import Row

# Create a test data frame
l = [('Alice', 1), ('Bob', 2)]
rdd = sc.parallelize(l)
Person = Row('name', 'age')
person = rdd.map(lambda r: Person(*r))
df2 = spark.createDataFrame(person)
df2.collect()

# Build a very simple pipeline using two transformers
string_indexer = StringIndexer(inputCol='name', outputCol='name_string_index')

feature_assembler = VectorAssembler(inputCols=[string_indexer.getOutputCol()], outputCol="features")

feature_pipeline = [string_indexer, feature_assembler]

featurePipeline = Pipeline(stages=feature_pipeline)

fittedPipeline = featurePipeline.fit(df2)


# serialize the model:
fittedPipeline.serializeToBundle("jar:file:/tmp/pyspark.example.zip", fittedPipeline.transform(df2))

但是我收到以下错误:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-2-98a49e4cd023> in <module>()
----> 1 fittedPipeline.serializeToBundle("jar:file:/tmp/pyspark.example.zip", fittedPipeline.transform(df2))

/opt/anaconda2/envs/py345/lib/python3.4/site-packages/mleap/pyspark/spark_support.py in serializeToBundle(self, path, dataset)
     22 
     23 def serializeToBundle(self, path, dataset=None):
---> 24     serializer = SimpleSparkSerializer()
     25     serializer.serializeToBundle(self, path, dataset=dataset)
     26 

/opt/anaconda2/envs/py345/lib/python3.4/site-packages/mleap/pyspark/spark_support.py in __init__(self)
     37     def __init__(self):
     38         super(SimpleSparkSerializer, self).__init__()
---> 39         self._java_obj = _jvm().ml.combust.mleap.spark.SimpleSparkSerializer()
     40 
     41     def serializeToBundle(self, transformer, path, dataset):

TypeError: 'JavaPackage' object is not callable

请协助?

4

2 回答 2

4

我设法通过下载并指向 spark 提交脚本上丢失的 jar 文件来解决这个问题。就我而言,我已经安装了 MLeap 0.8.1 并使用了基于 Scalar11 构建的 Spark2,因此我从MvnRepository下载了以下 jar 文件:

  • 指标-核心-2.2.0
  • mleap-base_2.11-0.8.1
  • mleap-core_2.11-0.8.1
  • mleap-runtime_2.11-0.8.1
  • mleap-spark_2.11-0.8.1
  • mleap-spark-base_2.11-0.8.1
  • mleap-tensor_2.11-0.8.1

然后我还使用我的 spark 提交文件上的标志指向了这个 jar 文件--jar,如下所示(我还使用--repository标志指向了 maven 存储库):

export PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client --driver-memory 40g --num-executors 15 --executor-memory 30g --executor-cores 5 --packages ml.combust.mleap:mleap-runtime_2.11:0.8.1 --repositories http://YOUR MAVEN REPO/ --jars arpack_combined_all-0.1.jar,mleap-base_2.11-0.8.1.jar,mleap-core_2.11-0.8.1.jar,mleap-runtime_2.11-0.8.1.jar,mleap-spark_2.11-0.8.1.jar,mleap-spark-base_2.11-0.8.1.jar,mleap-tensor_2.11-0.8.1.jar pyspark-shell'
jupyter notebook --no-browser --ip=$(hostname -f)

-来源

于 2018-08-28T11:51:54.237 回答
0

@Tshilidzi Madau 的答案是正确的 - 您需要做的是将mleap-sparkjar 添加到您的 spark 类路径中。

pyspark 中的一个选项是spark.jars.packages在创建时设置配置SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config('spark.jars.packages', 'ml.combust.mleap:mleap-spark_2.12:0.19.0') \
    .config("spark.jars.excludes", "net.sourceforge.f2j:arpack_combined_all") \ # this exclude is needed as this lib seems not to be available in public maven repos
    .getOrCreate()

我用 Spark3.0.3和 mleap测试了它0.19.0

于 2022-02-21T14:00:14.950 回答