0

我正在尝试从 flink statefun 退出到融合 kafka。在融合的 git repo 中,为了进行模式检查并将数据放入 kafka 主题,我们需要做的就是使用带有 avro 对象的 kafka 客户端 ProducerRecord 对象。

但是在statefun中,我们需要覆盖 kafka 出口的“ProducerRecord<byte[], byte[]> serialize”方法。这会导致以下错误。

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "bytes"

Schema registery 和 statefun kafka egress 似乎不兼容。有什么解决方法吗?

4

2 回答 2

2

此版本的有状态函数不直接支持模式注册表,但可能的解决方法很少:

  1. KafkaEgressSerializer您自己从班级连接到模式注册表。在您需要在此处发生的链接示例中。
  2. 提供您自己的 FlinkKafkaProducer 实例,该实例基于参见 AvroDeserializationSchema
  3. 在有状态函数之外管理模式,但将 Avro 记录序列化为字节。确保从传递给KafkaProducer
于 2020-06-24T15:40:00.777 回答
2

可以将 Confluent Schema Registry 与 Statefun Egress 一起使用。

为此,您首先使用模式注册表手动注册模式,然后提供按 实例序列化KafkaEgressSerializer的序列化。byte[]KafkaAvroSerializer

下面的代码是它的要点,并且符合 Igal 解决方法建议中的第一个:

public class SpecificRecordFromAvroSchemaSerializer implements KafkaEgressSerializer<SpecificRecordGeneratedFromAvroSchema> {

    private static String KAFKA_TOPIC = "kafka_topic";

    private static CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(
        "http://schema-registry:8081",
        1_000
    );
    private static KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient);

    static {
        try {
            schemaRegistryClient.register(
                KAFKA_TOPIC + "-value", // assuming subject name strategy is TopicNameStrategy (default)
                SpecificRecordGeneratedFromAvroSchema.getClassSchema()
            );
        } catch (IOException e) {
            e.printStackTrace();
        } catch (RestClientException e) {
            e.printStackTrace();
        }
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(SpecificRecordGeneratedFromAvroSchema specificRecordGeneratedFromAvroSchema) {
        byte[] valueData = kafkaAvroSerializer.serialize(
            KAFKA_TOPIC,
            specificRecordGeneratedFromAvroSchema
        );

        return new ProducerRecord<>(
            KAFKA_TOPIC,
            String.valueOf(System.currentTimeMillis()).getBytes(),
            valueData
        );
    }

}
于 2020-06-27T10:50:42.593 回答