0

我正在使用带有 kafka_2.12-2.2.1 的 Spark 2.4.0

我试图了解消费者配置“max.partition.fetch.bytes”的影响。默认值为 1048576 字节 (1 MB),它限制了 1 个请求中每个分区可以获取的最大字节数。我正在评估我们是否可以增加此标志的值(以及 message.max.bytes)以给每个 Spark 执行器施加更多负载(假设我们在执行器上有足够的内存。在我的设置中,Spark Web UI 没有显示太多内存在每个执行器中使用)

假设我们有

  1. 让我们考虑只有 1 个批次(比如 30 秒批次间隔)有 90 条记录。
  2. 具有 3 个分区的 Kafka 主题“T1”
  3. 3 个 Spark 执行器,每个具有 1 个插槽/核心。
  4. 假设通过正确的压缩,消费者端可以消耗更多的数据。但是为了简单起见,我们假设所有消息都是在没有任何压缩的情况下发送的。
  5. 每条消息为 100k。
  6. 发送到主题 T1 的记录数:90

在 Spark Web UI 中,我可以看到 3 个执行程序/任务中的每一个都获得 1 个偏移量为 0 到 30 的分区。

由于每条消息的大小为 100K,因此在对 kafka 代理的 1 个请求中,每个分区只能获取 10 条消息(如 max.partition.fetch.bytes 所示)。我还可以在日志中看到,对于从每个执行程序发送到 Kafka 代理的每个分区 3 网络请求。这是来自 1 个执行者的缩短日志:

  1. NetworkClient:将 FETCH fetch_offset=0,partition_max_bytes=1048576 发送到 kafka 节点
  2. 提取器:在偏移量 0 处提取 ..
  3. NetworkClient:发送 FETCH fetch_offset=10,partition_max_bytes=1048576 到 kafka 节点
  4. Fetcher:在偏移量 10 处获取 ..
  5. NetworkClient:发送 FETCH fetch_offset=20,partition_max_bytes=1048576 到 kafka 节点
  6. Fetcher:在偏移量 20 处获取 ..

我可以在 Kafka 代理端看到这些偏移量的类似请求日志。

对于 90 条记录的流式批处理,3 个任务中的每一个都被分配为 0-30 的分区偏移量。

即3个Partitions for Topic,3个Spark Executors,3个Tasks,每个任务处理偏移0到30。

因为 max.partition.fetch.bytes 默认为 1 MB,所以每个分区数据从每个 Task 的 3 个网络请求中获取,偏移量为 0 到 30。 (0-10,10-20,20-30)

所以假设处理数据集的代码是

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
JavaDStream<String> dataStream = stream.map(x -> x.value());
dataStream.foreachRDD((rdd) -> {
   JavaRDD<String> rowRDD = rdd.map(data -> data);
   if(!rowRDD.isEmpty()) { 
      Dataset<Row> dataset =  sparkSession.read().json(rowRDD);
      //Save dataset in some other storage
      dataset.save();
   }
}
  1. 因此,只有在检索到所有分配的偏移量(到任务 i. 0 到 30)的数据后,才会执行保存操作。

  2. 或者任何增量数据(直到分配给任务的分区)已经被提取,保存操作被执行,这一直持续到所有偏移量(分配给任务)都被处理。

  3. 我相信我们可以增加 max.partition.fetch.bytes 来减少 no。假设我们有足够的带宽/内存或在另一端向 Kafka 代理发出的网络请求,使用它来限制每个获取请求的负载。

欣赏是否有人可以确认/纠正这些假设或提供更多解释。

4

0 回答 0