0

我正在尝试为 kafka avro 序列化主题创建一个 flink 消费者。我有 kafka 主题流式传输 avro 序列化数据。我可以通过 avroconsoleconsumer 看到它。

Flink 1.6.0 增加了一个,AvroDeserializationSchema但我找不到完整的用法示例。是的,有一些似乎在 1.6.0 添加类之前生成了一个 avrodeserialization 类。

我有一个通过 avro-tools 生成的 avro 类。

现在,我一直在尝试遵循现有的示例,但它们的不同之处足以让我无法继续进行下去。(我不经常用 Java 编程)

大多数使用以下某种形式

Myclass mc = new MyClass();
AvroDeserializationSchema<Myclass> ads = new AvroDeserializationSchema<> (Myclass.class);
FlinkKafkaConsumer010<Myclass> kc = new FlinkKafkaConsumer010<>(topic,ads,properties);

其中 Myclass 是通过 avro-tools jar 生成的 avro 类。这是正确的方法吗?在执行此操作并利用内部 flink 1.6.0 avrodeserializationschema 类时,我遇到了一些私有/公共访问问题。我是否必须创建一个新类并扩展 avrodeserializationschema?

4

1 回答 1

0

好的,我深入研究了 kafka 消费者 javadocs 并找到了一个提取消费 avro 流的示例。我仍然需要将 kafka 消费转换为 flinkKafkaConsumer,但下面的代码有效。

对于 io.confluent 对工作的引用,我必须向 pom 文件添加存储库和依赖项。

<repository>
  <id>confluent</id>
    <url>http://packages.confluent.io/maven/</url>
</repository>

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>3.1.1</version>
</dependency>   
public class StreamingJob {

//  static  DeserializationSchema<pendingsv> avroSchema = new AvroDeserializationSchema<pendingsv>(pendingsv.class);
    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "opssupport.alarms");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://localhost:8081");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        String topic = "pendingSVs_";
        final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
        consumer.subscribe(Arrays.asList(topic));

        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }

        // execute program
        //env.execute("Flink Streaming Java API Skeleton");
    }
}
于 2018-08-27T02:11:43.453 回答