1

我正在尝试在我的 flink 流作业中反序列化 kafka 事件。这是我的代码:

...
case class URLResponse (status: Int, domain: String, url: String, queue: String, html: String)
...
val schema: Schema = AvroSchema[URLResponse]
...
val stream = env.addSource(new FlinkKafkaConsumer[GenericRecord](kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties))

并且在运行时抛出此异常的作业:

...
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    ... 26 more

Process finished with exit code 1

我读到我不应该使用Kryo,但我不知道该怎么做。我试过了:

executionConfig.enableForceAvro()
executionConfig.disableForceKryo()

但这无济于事。

4

4 回答 4

3

如果您不能使用 Java 环境添加源(也许您正在使用StreamExecutionEnvironment.readFile方法),这里共享另一个解决方案:https ://stackoverflow.com/a/32453031/899937 ,本质上是:

val unmodifiableCollectionClass = Class.forName("java.util.Collections$UnmodifiableCollection")
env.getConfig.addDefaultKryoSerializer(unmodifiableCollectionClass, classOf[UnmodifiableCollectionsSerializer])

kryo-serializers不再包含在 Flink 中,因此您必须将其添加为依赖项。

于 2021-07-29T00:36:17.707 回答
1

我在java中也遇到了同样的问题,下面的代码片段帮助我解决问题

    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
environment.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);

您还需要添加 maven 依赖项来解决 UnmodifiableCollectionsSerializer

    <dependency>
        <groupId>de.javakaffee</groupId>
        <artifactId>kryo-serializers</artifactId>
        <version>0.45</version>
    </dependency>
于 2021-09-16T13:22:14.203 回答
0

我在 Kinesis 数据流上使用 Avro GenericRecord 时遇到了同样的问题。使用 scala 2.12 和 flink 1.11.4。

我的解决方案是添加一个隐式 TypeInformation

implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)

下面是一个专注于序列化问题的完整代码示例:

@Test def `test avro generic record serializer`(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val schema: String =
    """
      |{
      | "namespace": "com.mberchon.monitor.dto.avro",
      | "type": "record",
      | "name": "TestAvro",
      | "fields": [
      |  {"name": "strVal", "type": ["null", "string"]},
      |  {"name": "longVal",  "type": ["null", "long"]}
      |  ]
      |}
""".stripMargin

  val avroSchema = new Schema.Parser().parse(schema)
  val rec:GenericRecord = new GenericRecordBuilder(avroSchema)
    .set("strVal","foo")
    .set("longVal",1234L)
    .build()

  implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)
  val _ = env.fromElements(rec,rec).addSink(new PrintSinkFunction[GenericRecord]())

  env.execute("Test serializer")
}

回到您的上下文,以下代码应该可以工作:

...
case class URLResponse (status: Int, domain: String, url: String, queue: String, html: String)
...
val schema: Schema = AvroSchema[URLResponse]
...
implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(schema)
val stream = env.addSource(new FlinkKafkaConsumer[GenericRecord](kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties))
于 2021-09-01T13:08:11.770 回答
0

提到的异常与 avro 反序列化的 scala 实现问题有关。如果我使用 java 实现(https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro),它工作正常。我的解决方案:

val javaStream = env.getJavaEnv.addSource(new FlinkKafkaConsumer[GenericRecord](
    kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties),
    new GenericRecordAvroTypeInfo(schema))
val stream = new DataStream[GenericRecord](javaStream)
于 2021-01-22T18:59:48.300 回答