2

在 Zeppelin Notebook 的 Spark Scala 项目中,调用 foreachRDD 转换会导致 NotSerializableException。

这是一个通过 Windows 收集数据的流应用程序,所以我必须启用检查点:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsAccessKeyId", "xxx")
hadoopConf.set("fs.s3n.awsSecretAccessKey", "xxx")

streamingCtx.checkpoint("s3n://mybucket/data/checkpoints")

这里的代码行(没有它,一切正常):

countPerPlug1h.foreachRDD(rdd => rdd.toDF().registerTempTable("test6"))

我收到以下错误:

java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.spark.streaming.StreamingContext
Serialization stack:
    - object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@13d5321)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: streamingCtx, type: class org.apache.spark.streaming.StreamingContext)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@50a1a698)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@1d4c88f7)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC@257426e0)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC@125ec828)
    - field (class: $iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@48950383)
    - field (class: $iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@412cdee8)
    - field (class: $iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC)
    - object (class $iwC$$iwC, $iwC$$iwC@1d8af6ae)
    - field (class: $iwC, name: $iw, type: class $iwC$$iwC)
    - object (class $iwC, $iwC@1a73298c)
    - field (class: $line36.$read, name: $iw, type: class $iwC)
    - object (class $line36.$read, $line36.$read@b6c225e)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $VAL184, type: class $line36.$read)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@3bcc2573)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@21c54459)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)
    - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1)
    - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>)
    - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
    - object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@5697482b)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 16)
    - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
    - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@5697482b, org.apache.spark.streaming.dstream.ForEachDStream@4b0a272f))
    - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
    - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files 

])
    - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
    - object (class org.apache.spark.streaming.dstream.SocketInputDStream, org.apache.spark.streaming.dstream.SocketInputDStream@3cbc22e2)
    - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
    - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files 

])
    - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
    - object (class org.apache.spark.streaming.dstream.SocketInputDStream, org.apache.spark.streaming.dstream.SocketInputDStream@3d84a839)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 16)
    - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
    - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.SocketInputDStream@3d84a839, org.apache.spark.streaming.dstream.SocketInputDStream@3cbc22e2))
    - writeObject data (class: org.apache.spark.streaming.DStreamGraph)
    - object (class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@27cc3d89)
    - field (class: org.apache.spark.streaming.Checkpoint, name: graph, type: class org.apache.spark.streaming.DStreamGraph)
    - object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@56913019)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:566)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:602)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:601)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
    at $iwC$$iwC$$iwC.<init>(<console>:55)
    at $iwC$$iwC.<init>(<console>:57)
    at $iwC.<init>(<console>:59)
    at <init>(<console>:61)
    at .<init>(<console>:65)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:658)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:623)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:616)
    at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:170)
    at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

但出于演示的原因,我需要这张表。

所以我的问题是:有没有可能避免这个代码行的序列化???

编辑

我发现一封似乎描述了同样问题的邮件: https ://mail-archives.apache.org/mod_mbox/spark-user/201510.mbox/%3CCAOADwJHhW7Rtv6sNpd3_y5Q12Uu6NEYLqgdT2H-WcnOf+3Aa-g@mail.gmail.com%3E

有人可以提供线索吗?

4

0 回答 0