2

我有一个主题,其中所有日志都被推送到集中主题,但如果可能的话,我想将其中一些记录过滤到单独的主题和集群中。

谢谢

4

1 回答 1

6

Kafka 流不允许使用来自不同 Kafka 集群的源和输出主题创建流。所以下面的代码对你不起作用

streamsBuilder.stream(sourceTopicName).filter(..).to(outputTopicName)

在这种情况下,它期望 outputTopicName 与主题 sourceTopicName 来自同一集群。

作为一种解决方法,为了将消息从另一个集群发送到输出主题,您可以使用另外创建的 KafkaProducer,其属性bootstrap.servers将指向外部集群和KStream.foreach()方法

streamsBuilder.stream(sourceTopicName)
    .filter((key, value) -> ..)
    .foreach((key, value) -> 
        sendMessage(kafkaProducerFromAnotherCluster, destinationTopicName, key, value);


public static void sendMessage(KafkaProducer<String, String> kafkaProducer, 
                               String destinationTopicName, String key, String value) {
    try {
        kafkaProducer.send(new ProducerRecord(destinationTopicName, key, value));
    } catch (RuntimeException ex) {
        log.error(errorMessage, ex);
    }
}

另一种选择是在您的 Kafka 集群中创建输出主题,该主题将过滤消息并在两个集群之间设置Kafka 镜像(因此消息将从一个主题复制到另一个集群的第二个主题)。

于 2018-10-28T19:14:43.010 回答