4

我在我的 spark 流应用程序中使用 sparkContext.broadcast 来共享 redis 连接池(JedisPool)。

像这样的代码:

lazy val redisPool = {
  val pool = Redis.createRedisPool(redisHost, redisPort)
  ssc.sparkContext.broadcast(pool)
}

Redis.createRedisPool 是:

object Redis {

  def createRedisPool(host: String, port: Int, maxIdle: Int, maxTotal: Int, timeout: Int): JedisPool = {
    val pc = new JedisPoolConfig
    pc.setMaxIdle(maxIdle)
    pc.setMaxTotal(maxTotal)
    pc.setMaxWaitMillis(timeout)
    new JedisPool(pc, host, port)
  }

  def createRedisPool(host: String, port: Int): JedisPool = {
    createRedisPool(host, port, 5, 5, 5000)
  }
}

它在本地部署模式下工作,但是当我在纱线/独立模式下运行它时

spark-submit --master "yarn-client" --class ...

会报错:

Exception in thread "main" java.io.NotSerializableException: redis.clients.jedis.JedisPool
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1165)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:210)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809)
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$lzycompute$1(AdSysStreaming.scala:84)
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$1(AdSysStreaming.scala:82)
    at org.culiu.bd.streaming.AdSysStreaming$.main(AdSysStreaming.scala:154)
    at org.culiu.bd.streaming.AdSysStreaming.main(AdSysStreaming.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我在我的应用程序中尝试了 set spark.serializer = org.apache.spark.serializer.KryoSerializer ,然后出现如下错误:

Exception in thread "main" com.esotericsoftware.kryo.KryoException:        java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.executor.ExecutorURLClassLoader)
factoryClassLoader (org.apache.commons.pool2.impl.GenericObjectPool)
internalPool (redis.clients.jedis.JedisPool)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:210)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809)
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$lzycompute$1(AdSysStreaming.scala:85)
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$1(AdSysStreaming.scala:83)
    at org.culiu.bd.streaming.AdSysStreaming$.main(AdSysStreaming.scala:155)
    at org.culiu.bd.streaming.AdSysStreaming.main(AdSysStreaming.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.ConcurrentModificationException
    at java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372)
    at java.util.AbstractList$Itr.next(AbstractList.java:343)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    ... 39 more

我该如何解决这个问题?

4

1 回答 1

3

看起来这里的问题是redis.clients.jedis.JedisPool该类不可序列化。这似乎不是特定于 Spark 的问题,因为我认为任何序列化该类的尝试都会失败。

于 2014-12-03T20:59:13.970 回答