1

我正在尝试将在 R 中生成的 PMML 模型文件导入 Spark Context 并使用它来预测分数。这是 Spark 中使用的代码。

JavaRDD<String> scoreData = data.map(new Function<String, String>() {

    @Override
    public String call(String line) throws Exception {
        String[] row = line.split(",");
        PMML pmml;
        Evaluator evaluator;
        FileSystem fs = FileSystem.get(new Configuration());
        FSDataInputStream inStr = fs.open(new Path("PATH_TO_PMML_FILE"));
        Source transformedSource = ImportFilter.apply(new InputSource(inStr));
        pmml = JAXBUtil.unmarshalPMML(transformedSource);
        System.out.println(pmml.getModels().get(0).getModelName());
        ModelEvaluatorFactory modelEvaluatorFactory = ModelEvaluatorFactory.newInstance();
        ModelEvaluator<?> modelEvaluator = modelEvaluatorFactory.newModelManager(pmml);
        System.out.println(modelEvaluator.getSummary());
        evaluator = (Evaluator) modelEvaluator;

        List<FieldName> activeFields = evaluator.getActiveFields();
        double[] features = new double[row.length - 2]; // row - {contact_id,label}  
        StringBuilder strBld = new StringBuilder();
        Map<FieldName, FieldValue> arguments = new LinkedHashMap<FieldName, FieldValue>();
        strBld.append(row[0]);
        for (int i = 3; i <= row.length - 1; i++) {
            //from f1 - f16
            FieldValue activeValue = evaluator.prepare(activeFields.get(i - 3), Double.parseDouble(row[i]));
            arguments.put(activeFields.get(i - 3), activeValue);
        }
    }

该代码在核心 Java 环境(没有 Spark 上下文)中运行时运行良好,但是在运行上述代码时出现以下异常

java.lang.NoSuchMethodError: com.google.common.collect.Range.closed(Ljava/lang/Comparable;Ljava/lang/Comparable;)Lcom/google/common/collect/Range;
at org.jpmml.evaluator.Classification$Type.<clinit>(Classification.java:278)
at org.jpmml.evaluator.ProbabilityDistribution.<init>(ProbabilityDistribution.java:26)
at org.jpmml.evaluator.GeneralRegressionModelEvaluator.evaluateClassification(GeneralRegressionModelEvaluator.java:333)
at org.jpmml.evaluator.GeneralRegressionModelEvaluator.evaluate(GeneralRegressionModelEvaluator.java:107)
at org.jpmml.evaluator.ModelEvaluator.evaluate(ModelEvaluator.java:266)
at org.zcoe.spark.pmml.PMMLSpark_2$1.call(PMMLSpark_2.java:146)
at org.zcoe.spark.pmml.PMMLSpark_2$1.call(PMMLSpark_2.java:1)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:999)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

问题似乎与运行代码所需的 Guvava Jar 文件的兼容性有关。我从 Spark 的类路径中删除了所有包含 com.google.common.collect.Range 类的罐子,但同样的问题仍然存在。

Spark 作业详细信息如下,

spark-submit --jars ./lib/pmml-evaluator-1.2.0.jar,./lib/pmml-model-1.2.2.jar,./lib/pmml-manager-1.1.20.jar,./ lib/pmml-schema-1.2.2.jar,./lib/guava-15.0.jar --class

[阶段 0:> (0 + 2) / 2]15/06/26 14:39:15 错误 YarnScheduler:在 hslave2 上丢失执行程序 1:远程 Akka 客户端取消关联 15/06/26 14:39:15 错误 YarnScheduler:丢失hslave1 上的执行程序 2:远程 Akka 客户端已解除关联 [阶段 0:>(0 + 2)/2]15/06/26 14:39:33 错误 YarnScheduler:hslave1 上丢失的执行程序 4:远程 Akka 客户端已解除关联 15/06/26 14:39:33 ERROR TaskSetManager: 阶段 0.0 中的任务 0 失败 4 次;中止工作

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, hslave1): ExecutorLostFailure (executor 4 lost)
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

如果我犯了任何错误,请告诉我。

4

1 回答 1

2

您应该让 Spark 和 JPMML 都有自己的 Guava 库版本。当您可以通过简单地重新打包 Spark 应用程序来实现目标时,修改 Spark 基本安装不是一个好主意。

如果您将 Spark 应用程序移动到 Apache Maven,则可以使用Maven Shade 插件的重定位功能将 JPMML 版本的 Guava 库移动到另一个包,例如org.jpmml.com.google. JPMML-Cascading 项目的示例应用程序就是这样做的。

此外,迁移到 Apache Maven 的好处是您的 Spark 应用程序将以 uber-JAR 文件的形式提供,这大大简化了其部署。例如,此时您pmml-manager-1.1.20.jar在命令行上指定,这是不需要的。

于 2015-06-26T18:39:46.470 回答