0

我在我们的 play scala 项目中开发响应式 kafka,在项目中我们创建了 5 个由消费者组订阅并且运行良好的主题,现在的问题是我创建了一个新主题,我如何将此主题添加到现有的消费者组(是有可能)我的代码是:

val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers(bootStrapServer)
      .withGroupId(groupId).withPollInterval(100 millis)

   Consumer.committableSource(consumerSettings, Subscriptions.topics(topicList))
            .groupedWithin(10, 15 seconds)
            .map({
              group =>
                var offSetBatch = CommittableOffsetBatch.empty
                val sessionList = group.toList.map { eachItem =>
                  offSetBatch = offSetBatch.updated(eachItem.committableOffset)
                  Json.parse(eachItem.record.value()).as[cityModel]
                }
                processRecords(cityList)
                offSetBatch
            }).mapAsync(1)(_.commitScaladsl())
            .toMat(Sink.ignore)(Keep.both)
            .run()

有什么方法可以向消费者添加主题

4

1 回答 1

0

在创建消费者时,我们可以给主题一个模式

Subscriptions.topicPattern("*.in") 这应该可以解决问题

于 2017-04-19T17:26:27.613 回答