3

我的 Kafka 主题有三个分区,我想知道我是否可以从三个分区中的一个分区中读取。我的消费者是 spark 结构化流应用程序。

下面是我在 spark 中现有的 kafka 设置。

  val inputDf = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", topic)
  .option("startingOffsets", "latest")
  .load()
4

2 回答 2

4

这是从特定分区读取的方法。

 val inputDf = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("assign", """{"topic":[0]}""") 
  .option("startingOffsets", "latest")
  .load()

PS:从多个分区而不是 1--> """{"topic":[0,1,2..n]}""" 读取

于 2019-02-15T07:06:52.887 回答
1

同样,您如何写入特定分区。我试过这个,但它不起作用。

        someDF
          .selectExpr("key", "value")
          .writeStream
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaServers)
          .option("topic", "someTopic")
          .option("partition", partIdx)
          .start()
于 2020-06-15T23:26:35.183 回答