我有一个主题,其中所有日志都被推送到集中主题,但如果可能的话,我想将其中一些记录过滤到单独的主题和集群中。
谢谢
我有一个主题,其中所有日志都被推送到集中主题,但如果可能的话,我想将其中一些记录过滤到单独的主题和集群中。
谢谢
Kafka 流不允许使用来自不同 Kafka 集群的源和输出主题创建流。所以下面的代码对你不起作用
streamsBuilder.stream(sourceTopicName).filter(..).to(outputTopicName)
在这种情况下,它期望 outputTopicName 与主题 sourceTopicName 来自同一集群。
作为一种解决方法,为了将消息从另一个集群发送到输出主题,您可以使用另外创建的 KafkaProducer,其属性bootstrap.servers
将指向外部集群和KStream.foreach()
方法。
streamsBuilder.stream(sourceTopicName)
.filter((key, value) -> ..)
.foreach((key, value) ->
sendMessage(kafkaProducerFromAnotherCluster, destinationTopicName, key, value);
public static void sendMessage(KafkaProducer<String, String> kafkaProducer,
String destinationTopicName, String key, String value) {
try {
kafkaProducer.send(new ProducerRecord(destinationTopicName, key, value));
} catch (RuntimeException ex) {
log.error(errorMessage, ex);
}
}
另一种选择是在您的 Kafka 集群中创建输出主题,该主题将过滤消息并在两个集群之间设置Kafka 镜像(因此消息将从一个主题复制到另一个集群的第二个主题)。