我正在使用带有 kafka_2.12-2.2.1 的 Spark 2.4.0
我试图了解消费者配置“max.partition.fetch.bytes”的影响。默认值为 1048576 字节 (1 MB),它限制了 1 个请求中每个分区可以获取的最大字节数。我正在评估我们是否可以增加此标志的值(以及 message.max.bytes)以给每个 Spark 执行器施加更多负载(假设我们在执行器上有足够的内存。在我的设置中,Spark Web UI 没有显示太多内存在每个执行器中使用)
假设我们有
- 让我们考虑只有 1 个批次(比如 30 秒批次间隔)有 90 条记录。
- 具有 3 个分区的 Kafka 主题“T1”
- 3 个 Spark 执行器,每个具有 1 个插槽/核心。
- 假设通过正确的压缩,消费者端可以消耗更多的数据。但是为了简单起见,我们假设所有消息都是在没有任何压缩的情况下发送的。
- 每条消息为 100k。
- 发送到主题 T1 的记录数:90
在 Spark Web UI 中,我可以看到 3 个执行程序/任务中的每一个都获得 1 个偏移量为 0 到 30 的分区。
由于每条消息的大小为 100K,因此在对 kafka 代理的 1 个请求中,每个分区只能获取 10 条消息(如 max.partition.fetch.bytes 所示)。我还可以在日志中看到,对于从每个执行程序发送到 Kafka 代理的每个分区 3 网络请求。这是来自 1 个执行者的缩短日志:
- NetworkClient:将 FETCH fetch_offset=0,partition_max_bytes=1048576 发送到 kafka 节点
- 提取器:在偏移量 0 处提取 ..
- NetworkClient:发送 FETCH fetch_offset=10,partition_max_bytes=1048576 到 kafka 节点
- Fetcher:在偏移量 10 处获取 ..
- NetworkClient:发送 FETCH fetch_offset=20,partition_max_bytes=1048576 到 kafka 节点
- Fetcher:在偏移量 20 处获取 ..
我可以在 Kafka 代理端看到这些偏移量的类似请求日志。
对于 90 条记录的流式批处理,3 个任务中的每一个都被分配为 0-30 的分区偏移量。
即3个Partitions for Topic,3个Spark Executors,3个Tasks,每个任务处理偏移0到30。
因为 max.partition.fetch.bytes 默认为 1 MB,所以每个分区数据从每个 Task 的 3 个网络请求中获取,偏移量为 0 到 30。 (0-10,10-20,20-30)
所以假设处理数据集的代码是
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
JavaDStream<String> dataStream = stream.map(x -> x.value());
dataStream.foreachRDD((rdd) -> {
JavaRDD<String> rowRDD = rdd.map(data -> data);
if(!rowRDD.isEmpty()) {
Dataset<Row> dataset = sparkSession.read().json(rowRDD);
//Save dataset in some other storage
dataset.save();
}
}
因此,只有在检索到所有分配的偏移量(到任务 i. 0 到 30)的数据后,才会执行保存操作。
或者任何增量数据(直到分配给任务的分区)已经被提取,保存操作被执行,这一直持续到所有偏移量(分配给任务)都被处理。
我相信我们可以增加 max.partition.fetch.bytes 来减少 no。假设我们有足够的带宽/内存或在另一端向 Kafka 代理发出的网络请求,使用它来限制每个获取请求的负载。
欣赏是否有人可以确认/纠正这些假设或提供更多解释。