3

我在通过 Beam读取FlinkKafkaProducer/写入 Avro 数据时遇到了一些问题。FlinkKafkaConsumer

如果有人可以指出FlinkKafkaProducerFlinkKafkaConsumer使用 Avro 模式的工作示例(不使用 Kafka 的融合版) ,那就太好了

A)BeamKafkaFlinkAvroProducerTest(生产者)

如果我直接使用 KafkaProducer(即调用productSimpleData),一切正常(仅用于测试)。通过以下步骤使用FlinkKafkaProduceras UnboundedSource (这是我应该做的)(即我称之为generateAvroData2 ):

  1. 首先,如果我使用AvroSerializationSchema schema = new AvroSerializationSchema(Test.class);

    即基本上使用 Avro 的org.apache.avro.specific.SpecificDatumWriter;我面临以下错误:

    Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.IndexedRecord
    at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
    at org.apache.avro.generic.GenericData.getField(GenericData.java:595)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
    
  2. 接下来,如果我使用TypeInformationSerializationSchema(不管管道中的 AvroCoder 是什么),事情显然可以正常工作,因为 Kafka 测试消费者工具会打印消息:

    java.lang.String{"uname": "Joe", "id": 6}
    

B)BeamKafkaFlinkAvroConsumerTest(消费者)

我知道我们应该TypeInformationSerializationSchema在消费者和生产者中使用,或者应该分别在消费者和生产者中使用AvroDeserializationSchemaAvroSerializationSchema

但是,无论使用AvroDeserializationSchemaor TypeInformationSerializationSchema,我都会收到以下异常:

Exception in thread "main" java.lang.NullPointerException: null value in entry: V=null
at com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
at com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
at org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)

可能缺少一些非常基本的东西。所有代码都在这里

4

0 回答 0