5

如果我的 Kafka 主题收到类似的记录

CHANNEL | VIEWERS | .....
ABC     |  100    | .....
CBS     |  200    | .....

我有 Spark 结构化流代码来读取和处理 Kafka 记录,如下所示:

val spark = SparkSession 
      .builder 
      .appName("TestPartition") 
      .master("local[*]") 
      .getOrCreate() 

    import spark.implicits._ 

    val dataFrame = spark 
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", 
      "1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092") 
      .option("subscribe", "partition_test") 
      .option("failOnDataLoss", "false") 
      .load() 
      .selectExpr("CAST(value AS STRING)") 
      // I will use a custom UDF to transform to a specific object

目前,我使用 foreachwriter 处理记录如下:

val writer = new ForeachWriter[testRec] {
    def open(partitionId: Long, version: Long): Boolean = {
      true
    }
    def process(record: testRec) = {
      handle(record)
    }
    def close(errorOrNull: Throwable): Unit = {
    }
  }

  val query = dataFrame.writeStream
    .format("console")
    .foreach(writer)
    .outputMode("append")
    .start()

代码工作得很好。但是,我想做的是按通道对传入数据进行分区,以便每个工作人员负责特定的通道,并且我在 handle() 块内进行与该通道相关的内存计算。那可能吗 ?如果是,我该怎么做?

4

1 回答 1

1

代码原样handle在记录级别应用该方法,并且独立于记录的分区。

我看到了两个选项来确保同一通道的所有消息都将在同一执行程序上处理:

  1. 如果您可以控制 KafkaProducer 将数据生成到主题“partition_test”中,则可以将值设置channel为 Kafka 消息的键。默认情况下,KafkaProducer 使用密钥来定义数据写入的分区。这将确保具有相同键的所有消息都将落在同一个 Kafka 主题分区中。由于使用 Kafka 主题的 Spark Structured Streaming 作业将匹配 Kafka 分区,因此您的结果dataFrame将具有与 Kafka 主题相同数量的分区,并且同一通道的所有消息都在同一分区中。

  2. 正如评论中已经写的那样,您可以通过 do 简单地根据列的值重新分区,dataFrame其中是分区数。这样,具有相同通道的所有记录都将落在同一个分区中,因此在同一个执行器上处理。channeldataFrame.repartition(n, col("columnName"))n

两个重要说明:

  • 获取分区(数据帧或 Kafka 主题)的所有权需要额外注意,因为您最终可能会遇到所谓的“数据倾斜”。与只有少量消息的分区相比,当您拥有包含大量消息的分区时会发生数据倾斜。这将对您的整体表现产生负面影响。

  • 只要您使用的是foreach输出接收器,无论如何在记录级别处理数据时,您的数据如何分区都无关紧要。如果您正在寻找更多控制,您可能宁愿使用接收foreachBatch器(在 Spark 2.4+ 中可用)。foreachBatch 输出接收器使您可以控制每个微批处理的批处理 Dataframe,您可以使用foreachPartitions或执行基于分区的逻辑mapPartitions

于 2021-03-04T15:42:00.797 回答