你可以做几件事:
您可以定义一个RecordTranslator
. ConsumerRecord
该接口允许您根据从 Kafka 读取的数据来定义 spout 将如何构造元组。
默认实现如下所示:
public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
@Override
public List<Object> apply(ConsumerRecord<K, V> record) {
return new Values(record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
}
@Override
public Fields getFieldsFor(String stream) {
return FIELDS;
}
如您所见,您将获得一个ConsumerRecord
,这是一个内置在底层 Kafka 客户端库中的类型,然后必须将其转换为List<Object>
元组值。如果您想在发出数据之前对记录做一些复杂的事情,这就是您的做法。例如,如果您想将键、值和偏移量填充到随后发出的数据结构中,您可以在此处执行此操作。你像使用翻译器一样KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build()
如果您只想将键/值反序列化为您自己的数据类之一,一个更好的选择是实现Deserializer
. 这将让您定义如何反序列化从 Kafka 获得的键/值。如果你想反序列化例如你自己的数据类的值,你可以使用这个接口来完成。
默认StringDeserializer
是这样的:
@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
一旦你创建了你自己的Deserializer
,你就可以通过执行类似KafkaSpoutConfig.builder(bootstrapServers, "test").setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, YourDeserializer.class).build()
. 有一个类似的消费者属性用于设置值反序列化器。