我KafkaProducer在我的测试用例中使用,我的生产者使用schemaRegistryUrl指向我本地实例的Schema Registry. 有没有办法模拟KafkaProducer与模式注册表的连接方式?也就是说,KafkaProducer/Consumer在我的测试中没有正在运行的 Schema Registry 实例。
11387 次
3 回答
13
在 5.3.x 中,您可以找到 MOCK_URL_PREFIX = "mock://",因此只需将测试 schemaRegistryUrl 设置为前缀为“mock://”,例如:“mock://testurl”。
于 2020-01-06T22:43:08.437 回答
12
绝对有。KafkaAvroSerializer 和 KafkaAvroDeserializer 都有一个接受 SchemaRegistryClient 的构造函数。您可以使用 MockSchemaRegistryClient 作为 SchemaRegistryClient。这是一个代码片段,展示了如何做到这一点:
private MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
private String registryUrl = "unused";
public <T> Serde<T> getAvroSerde(boolean isKey) {
return Serdes.serdeFrom(getSerializer(isKey), getDeserializer(isKey));
}
private <T> Serializer<T> getSerializer(boolean isKey) {
Map<String, Object> map = new HashMap<>();
map.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, true);
map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
Serializer<T> serializer = (Serializer) new KafkaAvroSerializer(mockSchemaRegistryClient);
serializer.configure(map, isKey);
return serializer;
}
private <T> Deserializer<T> getDeserializer(boolean key) {
Map<String, Object> map = new HashMap<>();
map.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
Deserializer<T> deserializer = (Deserializer) new KafkaAvroDeserializer(mockSchemaRegistryClient);
deserializer.configure(map, key);
return deserializer;
}
于 2018-05-02T18:45:23.197 回答
0
您可以通过创建自己的自定义 KafkaAvroSerializer 来实现。
在您的 appication.yml 进行测试而不是 io.confluent.kafka.serializers.KafkaAvroSerializer 使用自定义类如下
producer:
value-serializer: com.project.application.custom.MycustomKafkaAvroSerializer
package com.project.application.custom;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import java.util.Map;
public class MycustomKafkaAvroSerializer extends KafkaAvroSerializer {
public MycustomKafkaAvroSerializer() {
super();
super.schemaRegistry = new MockSchemaRegistryClient();
}
public MycustomKafkaAvroSerializer(SchemaRegistryClient client) {
super(new MockSchemaRegistryClient());
}
public MycustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
super(new MockSchemaRegistryClient(), props);
}
}
还要向模式注册表 url 添加值,虽然它不会被使用,但它不应该留空。
properties:
schema.registry.url: http://localhost:8080
于 2021-10-21T10:54:28.183 回答