可以将 Confluent Schema Registry 与 Statefun Egress 一起使用。
为此,您首先使用模式注册表手动注册模式,然后提供按 实例序列化KafkaEgressSerializer
的序列化。byte[]
KafkaAvroSerializer
下面的代码是它的要点,并且符合 Igal 解决方法建议中的第一个:
public class SpecificRecordFromAvroSchemaSerializer implements KafkaEgressSerializer<SpecificRecordGeneratedFromAvroSchema> {
private static String KAFKA_TOPIC = "kafka_topic";
private static CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(
"http://schema-registry:8081",
1_000
);
private static KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient);
static {
try {
schemaRegistryClient.register(
KAFKA_TOPIC + "-value", // assuming subject name strategy is TopicNameStrategy (default)
SpecificRecordGeneratedFromAvroSchema.getClassSchema()
);
} catch (IOException e) {
e.printStackTrace();
} catch (RestClientException e) {
e.printStackTrace();
}
}
@Override
public ProducerRecord<byte[], byte[]> serialize(SpecificRecordGeneratedFromAvroSchema specificRecordGeneratedFromAvroSchema) {
byte[] valueData = kafkaAvroSerializer.serialize(
KAFKA_TOPIC,
specificRecordGeneratedFromAvroSchema
);
return new ProducerRecord<>(
KAFKA_TOPIC,
String.valueOf(System.currentTimeMillis()).getBytes(),
valueData
);
}
}