10

I want to use Spark Structured Streaming to read from a secure kafka. This means that I will need to force a specific group.id. However, as is stated in the documentation this is not possible. Still, in the databricks documentation https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl, it says that it is possible. Does this only refer to the azure cluster?

Also, by looking at the documentation of the master branch of the apache/spark repo https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md, we can understand that such functionality is intended to be added at later spark releases. Do you know of any plans of such a stable release, that is going to allow setting that consumer group.id?

If not, are there any workarounds for Spark 2.4.0 to be able to set a specific consumer group.id?

4

4 回答 4

6

目前(v2.4.0)这是不可能的。

您可以在 Apache Spark 项目中检查以下行:

https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L81 -生成 group.id

https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L534 -在用于创建的属性中设置它KafkaConsumer

在 master 分支中,您可以找到修改,以设置前缀或特定group.id

https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L83 -生成组.id 基于组前缀 ( groupidprefix)

https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L543 -之前设置生成的 groupId,如果kafka.group.id没有传入属性

于 2019-03-26T11:51:50.100 回答
2

自 Spark 3.0.0

根据结构化 Kafka 集成指南,您可以提供 ConsumerGroup 作为选项kafka.group.id

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("kafka.group.id", "myConsumerGroup")
  .load()

但是,Spark 不会提交任何偏移量,因此您的 ConsumerGroups 的偏移量不会存储在 Kafka 的内部主题__consumer_offsets中,而是存储在 Spark 的检查点文件中。

能够设置它意味着使用基于角色的访问控制group.id来处理 Kafka 的最新功能授权,您的消费者组通常需要遵循命名约定。

此处kafka.group.id讨论并解决了 Spark 3.x 应用程序设置的完整示例。

于 2020-10-12T07:25:02.187 回答
1

现在使用 spark3.0,您可以为 kafka https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations指定 group.id

于 2020-06-27T20:22:04.647 回答
0

结构化流媒体指南似乎对此非常明确:

请注意,无法设置以下 Kafka 参数,Kafka 源或接收器将抛出异常:

group.id:Kafka 源将自动为每个查询创建一个唯一的组 ID。

auto.offset.reset:设置源选项 startingOffsets 以指定从哪里开始。

于 2019-12-10T13:43:23.537 回答