我正在使用 Kafka Streams 3.0.0。使用 Spring Cloud Stream。我正在尝试了解 RocksDB 缓存的内存配置,如本Confluent 文档中所述。
我有一个包含 20 个分区并num.standby.replicas
设置为 1 的 Kafka 主题。我在本地运行同一个 Kafka Streams 应用程序的 3 个实例。为了验证 RocksDB 可以使用多少内存,我正在检查kafka_stream_state_block_cache_capacity
Spring/actuator/prometheus
端点的指标。
我的 Kafka Stream 应用程序使用 WindowStore。
inputStream
.groupByKey()
.windowedBy(tumblingWindow)
.count(Materialized.<MyKeyAvro, Long, WindowStore<Bytes, byte[]>>as(STORE_NAME)
.withKeySerde(myKeyAvroSerde)
.withValueSerde(Serdes.Long())
.withRetention(Duration.ofDays(31))
);
我的 BoundedMemoryRocksDBConfig 是:
public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {
static {
RocksDB.loadLibrary();
}
private static long lruCacheBytes = 100L * 1024L * 1024L; //100MB
private static int maxBackgroundCompactions = 10;
private static int maxBackgroundFlushes = 10;
private static int maxBackgroundJobs = 20;
private static long memtableBytes = 1024 * 1024;
private static int nMemtables = 1;
private static long writeBufferManagerBytes = 95 * 1024 * 1024;
private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(lruCacheBytes);
private org.rocksdb.WriteBufferManager writeBufferManager = new org.rocksdb.WriteBufferManager(writeBufferManagerBytes, cache);
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
options.tableFormatConfig();
// These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)
tableConfig.setBlockCache(cache);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setWriteBufferManager(writeBufferManager);
// These options are recommended to be set when bounding the total memory
tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
tableConfig.setPinTopLevelIndexAndFilter(true);
options.setMaxWriteBufferNumber(nMemtables);
options.setWriteBufferSize(memtableBytes);
options.setCompressionType(CompressionType.LZ4_COMPRESSION);
options.setMaxBackgroundCompactions(maxBackgroundCompactions);
options.setMaxBackgroundFlushes(maxBackgroundFlushes);
options.setMaxBackgroundJobs(maxBackgroundJobs);
options.setMaxOpenFiles(10000);
options.setTableFormatConfig(tableConfig);
}
@Override
public void close(final String storeName, final Options options) {
cache.close();
writeBufferManager.close();}
}
请注意,cache
andwriteBufferManager
变量不是静态的。
其他相关的 Spring Cloud Stream 配置:
spring.cloud.stream.kafka.streams.binder.configuration.rocksdb.config.setter=com.kafkastreamsapp.config.BoundedMemoryRocksDBConfig
spring.cloud.stream.kafka.streams.binder.configuration.num.standby.replicas=1
从这个StackOverflow 答案中可以看出,使用 WindowStore 的任务由 2 个段组成,每个段将创建一个 RocksDB 实例。
我期望的是每个分区都映射到一个任务。每个任务应该使用大约 200MB 的内存。此外,由于num.standby.replicas
设置为 1,每个任务都会被复制。因此,总共应该有:
- 40 个任务(因为 20 个分区和
num.standby.replicas
1 个) - 80 段(因为每个任务 2 段)
- 80 个 RocksDB 实例(因为每个段 1 个 RocksDB 实例)
这意味着我应该期望 Kafka Streams 应用程序的每个实例负责 40/3 = 14 个任务。每个任务应该使用 200MB 即kafka_stream_state_block_cache_capacity
metric 是 200MB。
但是通过运行应用程序,我发现任务的数量是正确的,但是每个任务占用的内存,即kafka_stream_state_block_cache_capacity
指标是 400MB 而不是 200MB。为什么会这样?
kafka_stream_state_block_cache_capacity{kafka_version="3.0.0",rocksdb_window_state_id="one.minute.window.count",spring_id="stream-builder-process",task_id="0_6",thread_id="7ed0af6a-244f-4b87-b4cf-f2f311df976c-StreamThread-1",} 4.194304E8
kafka_stream_state_block_cache_capacity{kafka_version="3.0.0",rocksdb_window_state_id="one.minute.window.count",spring_id="stream-builder-process",task_id="0_8",thread_id="7ed0af6a-244f-4b87-b4cf-f2f311df976c-StreamThread-1",} 4.194304E8
kafka_stream_state_block_cache_capacity{kafka_version="3.0.0",rocksdb_window_state_id="one.minute.window.count",spring_id="stream-builder-process",task_id="0_9",thread_id="7ed0af6a-244f-4b87-b4cf-f2f311df976c-StreamThread-1",} 4.194304E8