除了集成 Spark SQL 和 Spark Streaming 时出现的 Not Serializable 异常
我的源代码
public static void main(String args[]) {
SparkConf sparkConf = new SparkConf().setAppName("NumberCount");
JavaSparkContext jc = new JavaSparkContext(sparkConf);
JavaStreamingContext jssc = new JavaStreamingContext(jc, new Duration(2000));
jssc.addStreamingListener(new WorkCountMonitor());
int numThreads = Integer.parseInt(args[3]);
Map<String,Integer> topicMap = new HashMap<String,Integer>();
String[] topics = args[2].split(",");
for (String topic : topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String,String> data = KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
data.print();
JavaDStream<Person> streamData = data.map(new Function<Tuple2<String, String>, Person>() {
public Person call(Tuple2<String,String> v1) throws Exception {
String[] stringArray = v1._2.split(",");
Person Person = new Person();
Person.setName(stringArray[0]);
Person.setAge(stringArray[1]);
return Person;
}
});
final JavaSQLContext sqlContext = new JavaSQLContext(jc);
streamData.foreachRDD(new Function<JavaRDD<Person>,Void>() {
public Void call(JavaRDD<Person> rdd) {
JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd, Person.class);
subscriberSchema.registerAsTable("people");
System.out.println("all data");
JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people");
System.out.println("afterwards");
List<String> males = new ArrayList<String>();
males = names.map(new Function<Row,String>() {
public String call(Row row) {
return row.getString(0);
}
}).collect();
System.out.println("before for");
for (String name : males) {
System.out.println(name);
}
return null;
}
});
jssc.start();
jssc.awaitTermination();
}
JavaSQLContext 也在 ForeachRDD 循环之外声明,但我仍然收到 NonSerializableException
23 年 14 月 12 日 23:49:38 错误 JobScheduler:运行作业流作业时出错 1419378578000 ms.1 org.apache.spark.SparkException:在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala :166) org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) org.apache.spark.SparkContext.clean(SparkContext.scala:1435) org.apache.spark.rdd.RDD .map(RDD.scala:271) 在 org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) 在 org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD .scala:42) 在 com.basic.spark.NumberCount$2.call(NumberCount.java:79) 在 com.basic.spark.NumberCount$2.call(NumberCount.java:67) 在 org.apache.spark.streaming。 api.java.JavaDStreamLike$$anonfun$foreachRDD$1。在 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) 在 org.apache.spark.streaming.dstream.DStream$ 应用(JavaDStreamLike.scala:274) $anonfun$foreachRDD$1.apply(DStream.scala:529) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:529) 在 org.apache.spark.streaming。 dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) 在 org.apache。 spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run (Job.scala:32) 在 org.apache.spark.streaming.scheduler。JobScheduler$JobHandler.run(JobScheduler.scala:171) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java .lang.Thread.run(Thread.java:724) 引起:java.io.NotSerializableException: org.apache.spark.sql.api.java.JavaSQLContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) 在 java.io。 java.io.ObjectOutputStream 中的 ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)。defaultWriteFields(ObjectOutputStream.java:1541) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java :1175) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) 在 java .io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 上的 apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) ... 还有 20 个
如果您有任何建议,我将不胜感激。