0

我想将经过训练的 pyspark 模型(或管道)导入 pyspark 脚本。我训练了一个决策树模型,如下所示:

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

# Create assembler and labeller for spark.ml format preperation
assembler = VectorAssembler(inputCols = requiredFeatures, outputCol = 'features')
label_indexer = StringIndexer(inputCol='measurement_status', outputCol='indexed_label')

# Apply transformations
eq_df_labelled = label_indexer.fit(eq_df).transform(eq_df)
eq_df_labelled_featured = assembler.transform(eq_df_labelled)

# Split into training and testing datasets
(training_data, test_data) = eq_df_labelled_featured.randomSplit([0.75, 0.25])

# Create a decision tree algorithm
dtree = DecisionTreeClassifier(
    labelCol ='indexed_label',
    featuresCol = 'features',
    maxDepth = 5,
    minInstancesPerNode=1,
    impurity = 'gini',
    maxBins=32,
    seed=None
)

# Fit classifier object to training data
dtree_model = dtree.fit(training_data)

# Save model to given directory
dtree_model.save("models/dtree")

上面的所有代码都可以正常工作,没有任何错误。问题是,当我尝试加载这个模型(在同一个或另一个 pyspark 应用程序上)时,使用:

from pyspark.ml.classification import DecisionTreeClassifier

imported_model = DecisionTreeClassifier()
imported_model.load("models/dtree")

我收到以下错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-4-b283bc2da75f> in <module>
      2 
      3 imported_model = DecisionTreeClassifier()
----> 4 imported_model.load("models/dtree")
      5 
      6 #lodel = DecisionTreeClassifier.load("models/dtree-test/")

~/.local/lib/python3.6/site-packages/pyspark/ml/util.py in load(cls, path)
    328     def load(cls, path):
    329         """Reads an ML instance from the input path, a shortcut of `read().load(path)`."""
--> 330         return cls.read().load(path)
    331 
    332 

~/.local/lib/python3.6/site-packages/pyspark/ml/util.py in load(self, path)
    278         if not isinstance(path, basestring):
    279             raise TypeError("path should be a basestring, got type %s" % type(path))
--> 280         java_obj = self._jread.load(path)
    281         if not hasattr(self._clazz, "_from_java"):
    282             raise NotImplementedError("This Java ML type cannot be loaded into Python currently: %r"

~/.local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

~/.local/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

~/.local/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o39.load.
: java.lang.UnsupportedOperationException: empty collection
    at org.apache.spark.rdd.RDD.$anonfun$first$1(RDD.scala:1439)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1437)
    at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:587)
    at org.apache.spark.ml.util.DefaultParamsReader.load(ReadWrite.scala:465)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

我选择了这种方法,因为它也无法使用Pipeline对象。关于正在发生的事情有什么想法吗?

更新

我已经意识到这个错误只发生在我使用我的 Spark 集群(一个 master,两个 worker 使用 Spark 的独立集群管理器)时。如果我像这样设置 Spark Session(主设置为本地):

spark = SparkSession\
    .builder\
    .config(conf=conf)\
    .appName("MachineLearningTesting")\
    .master("local[*]")\
    .getOrCreate()

我没有收到上述错误。

另外,我使用的是Spark 3.0.0,难道Spark 3中的模型导入和导出仍然存在错误?

4

1 回答 1

0

有两个问题:

  1. 必须在集群中的所有节点之间启用 SSH 身份验证通信。尽管我的 Spark 集群中的所有节点都在同一个网络中,但只有主节点对工作节点进行了 SSH 身份验证,反之亦然。

  2. 该模型必须可用于集群中的所有节点。这听起来可能很明显,但我认为模型文件只需要对主节点可用,然后主节点将其传播到工作节点。换句话说,当您像这样加载模型时:

from pyspark.ml.classification import DecisionTreeClassifier

imported_model = DecisionTreeClassifier()
imported_model.load("models/dtree")

该文件/absoloute_path/models/dtree必须存在于集群中的每台机器上。这让我明白,在生产环境中,模型可能是通过外部共享文件系统访问的。

这两个步骤解决了我将 pyspark 模型加载到集群上运行的 Spark 应用程序的问题。

于 2020-10-16T07:07:21.540 回答