0

我正在使用 Kafka Streams 3.0.0。使用 Spring Cloud Stream。我正在尝试了解 RocksDB 缓存的内存配置,如本Confluent 文档中所述。

我有一个包含 20 个分区并num.standby.replicas设置为 1 的 Kafka 主题。我在本地运行同一个 Kafka Streams 应用程序的 3 个实例。为了验证 RocksDB 可以使用多少内存,我正在检查kafka_stream_state_block_cache_capacitySpring/actuator/prometheus端点的指标。

我的 Kafka Stream 应用程序使用 WindowStore。

inputStream
   .groupByKey()
   .windowedBy(tumblingWindow)
   .count(Materialized.<MyKeyAvro, Long, WindowStore<Bytes, byte[]>>as(STORE_NAME)
      .withKeySerde(myKeyAvroSerde)
      .withValueSerde(Serdes.Long())
      .withRetention(Duration.ofDays(31))
   );

我的 BoundedMemoryRocksDBConfig 是:

public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {
    static {
        RocksDB.loadLibrary();
    }

    private static long lruCacheBytes = 100L * 1024L * 1024L; //100MB
    private static int maxBackgroundCompactions = 10;
    private static int maxBackgroundFlushes = 10;
    private static int maxBackgroundJobs = 20;
    private static long memtableBytes = 1024 * 1024;
    private static int nMemtables = 1;
    private static long writeBufferManagerBytes = 95 * 1024 * 1024;

    private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(lruCacheBytes);
    private org.rocksdb.WriteBufferManager writeBufferManager = new org.rocksdb.WriteBufferManager(writeBufferManagerBytes, cache);

    @Override
    public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
        BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) 
        options.tableFormatConfig();

        // These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)
        tableConfig.setBlockCache(cache);
        tableConfig.setCacheIndexAndFilterBlocks(true);
        options.setWriteBufferManager(writeBufferManager);

        // These options are recommended to be set when bounding the total memory
        tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
        tableConfig.setPinTopLevelIndexAndFilter(true);

        options.setMaxWriteBufferNumber(nMemtables);
        options.setWriteBufferSize(memtableBytes);

        options.setCompressionType(CompressionType.LZ4_COMPRESSION);

        options.setMaxBackgroundCompactions(maxBackgroundCompactions);
        options.setMaxBackgroundFlushes(maxBackgroundFlushes);
        options.setMaxBackgroundJobs(maxBackgroundJobs);

        options.setMaxOpenFiles(10000);

        options.setTableFormatConfig(tableConfig);
    }

    @Override
    public void close(final String storeName, final Options options) {
       cache.close();
       writeBufferManager.close();}
    }

请注意,cacheandwriteBufferManager变量不是静态的。

其他相关的 Spring Cloud Stream 配置:

spring.cloud.stream.kafka.streams.binder.configuration.rocksdb.config.setter=com.kafkastreamsapp.config.BoundedMemoryRocksDBConfig
spring.cloud.stream.kafka.streams.binder.configuration.num.standby.replicas=1

从这个StackOverflow 答案中可以看出,使用 WindowStore 的任务由 2 个段组成,每个段将创建一个 RocksDB 实例。

我期望的是每个分区都映射到一个任务。每个任务应该使用大约 200MB 的内存。此外,由于num.standby.replicas设置为 1,每个任务都会被复制。因此,总共应该有:

  • 40 个任务(因为 20 个分区和num.standby.replicas1 个)
  • 80 段(因为每个任务 2 段)
  • 80 个 RocksDB 实例(因为每个段 1 个 RocksDB 实例)

这意味着我应该期望 Kafka Streams 应用程序的每个实例负责 40/3 = 14 个任务。每个任务应该使用 200MB 即kafka_stream_state_block_cache_capacitymetric 是 200MB。

但是通过运行应用程序,我发现任务的数量是正确的,但是每个任务占用的内存,即kafka_stream_state_block_cache_capacity指标是 400MB 而不是 200MB。为什么会这样?

kafka_stream_state_block_cache_capacity{kafka_version="3.0.0",rocksdb_window_state_id="one.minute.window.count",spring_id="stream-builder-process",task_id="0_6",thread_id="7ed0af6a-244f-4b87-b4cf-f2f311df976c-StreamThread-1",} 4.194304E8
kafka_stream_state_block_cache_capacity{kafka_version="3.0.0",rocksdb_window_state_id="one.minute.window.count",spring_id="stream-builder-process",task_id="0_8",thread_id="7ed0af6a-244f-4b87-b4cf-f2f311df976c-StreamThread-1",} 4.194304E8
kafka_stream_state_block_cache_capacity{kafka_version="3.0.0",rocksdb_window_state_id="one.minute.window.count",spring_id="stream-builder-process",task_id="0_9",thread_id="7ed0af6a-244f-4b87-b4cf-f2f311df976c-StreamThread-1",} 4.194304E8

4

0 回答 0