5

我遇到了几个可序列化的异常,我在 Flink 的互联网和文档上做了一些搜索;有一些著名的解决方案,如瞬态、扩展可序列化等。每次异常的起源都很清楚,但就我而言,我无法找到它没有序列化的确切位置。

问:我应该如何调试这种异常?

斯卡拉:

class executor ( val sink: SinkFunction[List[String]] {
    def exe(): Unit = {
        xxx.....addSink(sinks)
    }
}

B.scala:

class Main extends App {
  def createSink: SinkFunction[List[String]] = new StringSink()

  object StringSink {
    // static
    val stringList: List[String] = List()
  }

  // create a testing sink
  class StringSink extends SinkFunction[List[String]] {
    override def invoke(strs: List[String]): Unit = {
        // add strs into the variable "stringList" of the compagin object StringSink
    }
  }

  new executor(createSink()).exe()

  // then do somethings with the strings
}

例外是:

SinkFunction的实现是不可序列化的。该对象可能包含或引用不可序列化的字段。

我发现了两个可疑点:

  1. 的实例StringSink被传递到另一个文件中。
  2. 在 类中StringSink,它使用了stringList 它的 compagin 对象的静态变量。
4

2 回答 2

3

我遇到了类似的问题。过去需要很长时间才能找出哪些成员/对象不可序列化。异常日志并没有真正的帮助。

帮助我的是以下 JVM 选项,它可以在异常跟踪中启用更多详细信息。

启用此选项...

-Dsun.io.serialization.extendedDebugInfo=true

于 2018-05-04T14:01:57.393 回答
2

我的第一个猜测是您在 StringSink 中没有无参数构造函数

POJO 类型的规则从这里剪辑

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

  1. 该类是公共且独立的(没有非静态内部类)
  2. 该类有一个公共的无参数构造函数
  3. 类(和所有超类)中的所有非静态、非瞬态字段要么是公共的(且非最终的),要么具有遵循 Java bean 的 getter 和 setter 命名约定的公共 getter- 和 setter- 方法。

只需添加一个无参数构造函数,然后重试

    class StringSink extends SinkFunction[List[String]] {
        public StringSink() {
        }
        
        @override def invoke(strs: List[String]): Unit = {
            // add strs into the variable "stringList" of the compagin object StringSink
        }
}
于 2018-04-24T21:45:27.637 回答