2

我发现在特定的小型集群和本地机器中运行作业时出现问题。这项工作在大型机器上运行顺畅。我在用着:

  • com.twitter "chill-protobuf" 0.7.6
    • .排除 com.esotericsoftware.kryo “kryo”
  • com.google.protobuf "protobuf-java" 3.18.1

我在用着:

volumeMounts:
  - name: rocksdb-volume
    volume:
      emptyDir:
        sizeLimit: 5Gi
      name: rocksdb-volume
    volumeMount:
      mountPath: /opt/flink/rocksdb

登录到一个任务管理器我可以看到路径/opt/flink/rocksdb有 1GB 并且 k8s 没有指示 DiskPressure。

并使用以下资源:

numberOfTaskManagers: 2
parallelism: 4
resources:
   taskmanager:
      cpu: 0.5
      memory: 6G

与其他运行良好且负载更大的集群相比,2 个 6G 的 TM 太多了。

我收到以下错误:

[FlinkOperatorName] (2/2)#15 (c2f42784dab9c9046192b067c0115499) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: Could not create class [ProtobufClassName]
    at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
    at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
    at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:76)
    at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:29)
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126)
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left.
    at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
    at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
    at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
    ... 23 more
Caused by: java.io.EOFException: No more bytes left.
    ... 26 more

Protobuf + Kryo read + EOFException 表明问题可能是读取检查点或保存点。

我应该看什么?

4

1 回答 1

1

该问题与添加到作为该作业输入的 Kafka 主题的坏墓碑有关。

Kafka 区分空值null和空""值。我错误地喂食""而不是空。


从错误堆栈跟踪来看,问题不是在反序列化代码下发生,而是在其他地方发生。不确定它如何构建一个不能被 Kryo 序列化的 Protobuf 对象。

于 2022-02-15T14:08:16.850 回答