2

我正在使用带有 Kafka 的 Spark Structured 流,并且主题被订阅为模式:

option("subscribePattern", "topic.*")

// Subscribe to a pattern
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()

一旦我开始工作并列出了一个新主题topic.new_topic,该工作就不会自动开始收听新主题,它需要重新启动。

有没有办法在不重新启动作业的情况下自动订阅新模式?

火花:3.0.0

4

1 回答 1

3

KafkaConsumer 的默认行为是每 5 分钟检查一次是否有新的分区要使用。此配置是通过Consumer 配置设置的

metadata.max.age.ms:以毫秒为单位的时间段,在此之后我们强制刷新元数据,即使我们没有看到任何分区领导层更改以主动发现任何新的代理或分区。

根据Kafka 特定配置的 Spark + Kafka 集成指南,您可以使用前缀设置此配置,kafka.如下所示:

.option("kafka.metadata.max.age.ms", "1000")

通过此设置,新创建的主题将在创建后 1 秒被消耗。

(使用 Spark 3.0.0 和 Kafka Broker 2.5.0 测试)

于 2020-12-18T20:20:52.960 回答