0

我试图让我的消费者动态更新其消费。

让我给你一个使用动物的更具体的例子。想象一下,我有一家宠物店,每个主题都是一种动物(例如狗、猫、鱼)。我的 Kafka 消费者的主要职责是获取我们在 Kafka 中拥有的任何日志/记录/消息,并将它们存储到数据库中。

假设我的消费者正在积极消费主题dogs并且cats一切正常,现在有一种新型动物进入商店,并且在 Kafka 集群中生成了一个新主题。如何通知我的消费者添加了新主题?

我有两个建议,我想看看你认为哪个更好?或者如果有更好的第三种选择,请告诉我。

1.) 生产者向消费者发送一个http请求,让消费者知道生产者将要创建一个新主题,以便消费者采取相应的行动。这种方法的问题在于,存在竞争条件。消费者有可能在创建主题之前尝试消费。(实际上我刚刚发现,如果我auto.topic.creation.enable设置为 true,那么竞态条件就不是问题了。)

topic_updates2.)在 Kafka 集群中创建一个额外的主题。因此,每当生产者成功向 Kafka 集群提交消息时,它都会通过 this 广播消息topic_updates,也许一个简单的字符串就可以了。消费者正在积极收听此主题更新。

3.) 我不知道,理想情况下,我希望 Kafka 能够在创建新主题时发出事件。

先感谢您

4

2 回答 2

2

消费者能够自动找到新创建的主题,您可以通过调用简单地订阅所有主题consumer.subscribe(Pattern.compile(".*"));

可以降低成本metadata.max.age.ms,让消费者更快地了解新主题。

于 2017-10-11T01:53:43.070 回答
0

您可以使用新的 KafkaAdminClient 并以某种方式监控主题列表并检查新添加的内容。这是一个示例代码,可为您提供主题列表(不包括内部主题):

Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);
ListTopicsResult listTopicResult = kafkaAdminClient.listTopics();
System.out.println(listTopicResult.names().get().toString());
于 2017-10-11T17:50:59.153 回答