我在通过 Beam读取FlinkKafkaProducer
/写入 Avro 数据时遇到了一些问题。FlinkKafkaConsumer
如果有人可以指出FlinkKafkaProducer
并FlinkKafkaConsumer
使用 Avro 模式的工作示例(不使用 Kafka 的融合版) ,那就太好了
A)BeamKafkaFlinkAvroProducerTest(生产者)
如果我直接使用 KafkaProducer(即调用productSimpleData),一切正常(仅用于测试)。通过以下步骤使用FlinkKafkaProducer
as UnboundedSource (这是我应该做的)(即我称之为generateAvroData2 ):
首先,如果我使用
AvroSerializationSchema schema = new AvroSerializationSchema(Test.class);
即基本上使用 Avro 的
org.apache.avro.specific.SpecificDatumWriter
;我面临以下错误:Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.IndexedRecord at org.apache.avro.generic.GenericData.getField(GenericData.java:580) at org.apache.avro.generic.GenericData.getField(GenericData.java:595) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
接下来,如果我使用
TypeInformationSerializationSchema
(不管管道中的 AvroCoder 是什么),事情显然可以正常工作,因为 Kafka 测试消费者工具会打印消息:java.lang.String{"uname": "Joe", "id": 6}
B)BeamKafkaFlinkAvroConsumerTest(消费者)
我知道我们应该TypeInformationSerializationSchema
在消费者和生产者中使用,或者应该分别在消费者和生产者中使用AvroDeserializationSchema
和AvroSerializationSchema
。
但是,无论使用AvroDeserializationSchema
or TypeInformationSerializationSchema
,我都会收到以下异常:
Exception in thread "main" java.lang.NullPointerException: null value in entry: V=null
at com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
at com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
at org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
可能缺少一些非常基本的东西。所有代码都在这里。