我是弹簧整合 kafka 的新手,我了解 kafka-oubound-channel 适配器。但是有没有一种方法可以让我无需在上下文 xml 中设置而以编程方式创建主题?
即:根据我给转换器的消息,我想将消息发布到为此消息类型创建的 kafka 主题。
更新:
下面是我最终做的事情。将欢迎任何更好的解决方案。
<int:channel id="inputForSolrPublish"></int:channel>
<int:service-activator input-channel="inputForSolrPublish"
ref="solrMasterListRouter" >
-->
private void postMessageToMasterSpecifcTopics(final List<String> topicNames,
final String brokerList,
final Message<?> message) throws Exception {
for (String topicName : topicNames) {
createProducerContext(topicName,
brokerList).send(topicName,
message.getHeaders()
.get(KafkaHeaders.MESSAGE_KEY),
message);
}
}
private KafkaProducerContext<String, String> createProducerContext(final String topicName,
final String brokerList) throws Exception {
KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
AvroReflectDatumBackedKafkaEncoder<String> kafkaReflectionEncoder = new AvroReflectDatumBackedKafkaEncoder<>(String.class);
AvroSpecificDatumBackedKafkaEncoder<String> kafkaSpecificEncoder = new AvroSpecificDatumBackedKafkaEncoder<>(String.class);
// Encoder<String> encoder = new
// org.springframework.integration.kafka.serializer.common.StringEncoder<String>();
ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>(topicName);
producerMetadata.setValueClassType(String.class);
producerMetadata.setKeyClassType(String.class);
producerMetadata.setValueEncoder(kafkaSpecificEncoder);
producerMetadata.setKeyEncoder(kafkaReflectionEncoder);
producerMetadata.setAsync(true);
Properties props = buildProducerConfigProperties();
ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata,
brokerList,
props);
ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata,
producer.getObject());
kafkaProducerContext.setProducerConfigurations(Collections.singletonMap(topicName,
config));
return kafkaProducerContext;
}
private Properties buildProducerConfigProperties() {
Properties props = new Properties();
props.put("topic.metadata.refresh.interval.ms",
"3600000");
props.put("message.send.max.retries",
"5");
props.put("tsend.buffer.bytes",
"5242880");
return props;
}