我在我们的 play scala 项目中开发响应式 kafka,在项目中我们创建了 5 个由消费者组订阅并且运行良好的主题,现在的问题是我创建了一个新主题,我如何将此主题添加到现有的消费者组(是有可能)我的代码是:
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootStrapServer)
.withGroupId(groupId).withPollInterval(100 millis)
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicList))
.groupedWithin(10, 15 seconds)
.map({
group =>
var offSetBatch = CommittableOffsetBatch.empty
val sessionList = group.toList.map { eachItem =>
offSetBatch = offSetBatch.updated(eachItem.committableOffset)
Json.parse(eachItem.record.value()).as[cityModel]
}
processRecords(cityList)
offSetBatch
}).mapAsync(1)(_.commitScaladsl())
.toMat(Sink.ignore)(Keep.both)
.run()
有什么方法可以向消费者添加主题