2

我正在尝试运行适用于 Spark 的 Hadoop-MongoDB 连接器的 EnronMail 示例。因此我使用来自 GitHub 的 java 代码示例: https ://github.com/mongodb/mongo-hadoop/blob/master/examples/enron/spark/src/main/java/com/mongodb/spark/examples/enron /Enron.java 我根据需要调整了服务器名称并添加了用户名和密码。

我收到的错误消息如下:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2066)
    at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:333)
    at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:332)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.flatMap(RDD.scala:332)
    at org.apache.spark.api.java.JavaRDDLike$class.flatMap(JavaRDDLike.scala:130)
    at org.apache.spark.api.java.AbstractJavaRDDLike.flatMap(JavaRDDLike.scala:46)
    at Enron.run(Enron.java:43)
    at Enron.main(Enron.java:104)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: Enron
Serialization stack:
    - object not serializable (class: Enron, value: Enron@62b09715)
    - field (class: Enron$1, name: this$0, type: class Enron)
    - object (class Enron$1, Enron$1@ee8e7ff)
    - field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1, name: f$3, type: interface org.apache.spark.api.java.function.FlatMapFunction)
    - object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 22 more

然后我为 FlatMapFunction 创建了一个新类,并通过这个类扩展了 Enron 类。这无法解决问题。任何想法如何解决这个问题?

class FlatMapFunctionSer implements Serializable{
  static FlatMapFunction<Tuple2<Object, BSONObject>, String> flatFunc = new FlatMapFunction<Tuple2<Object, BSONObject>, String>() {

          @Override
          public Iterable<String> call(final Tuple2<Object, BSONObject> t) throws Exception {

              BSONObject header = (BSONObject) t._2().get("headers");
              String to = (String) header.get("To");
              String from = (String) header.get("From");

              // each tuple in the set is an individual from|to pair
              //JavaPairRDD<String, Integer> tuples = new JavaPairRDD<String, Integer>();
              List<String> tuples = new ArrayList<String>();

              if (to != null && !to.isEmpty()) {
                  for (String recipient : to.split(",")) {
                      String s = recipient.trim();
                      if (s.length() > 0) {
                          tuples.add(from + "|" + s);
                      }
                  }
              }
              return tuples;
          }
      };
}
4

1 回答 1

2

通过将 mongo-hadoop-spark-2.0.2.jar 包含到调用中,问题得到了解决。并且还通过使用以下 pom:

<dependencies>
<dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>3.8.1</version>
  <scope>test</scope>
</dependency>

          <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>1.5.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>1.5.1</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.14</version>
    </dependency>

<!-- https://mvnrepository.com/artifact/org.mongodb.mongo-hadoop/mongo-hadoop-core -->
<dependency>
    <groupId>org.mongodb.mongo-hadoop</groupId>
    <artifactId>mongo-hadoop-core</artifactId>
    <version>1.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mongodb/bson -->
<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>bson</artifactId>
    <version>3.4.2</version>
    </dependency>
  </dependencies>
</project>
于 2017-04-04T09:15:01.970 回答