0

我正在设置一个新的 Kafka 流应用程序,并希望使用 RocksDb 使用自定义状态存储。这适用于将数据放入状态存储并从中获取可查询的状态存储并迭代数据,但是,大约 72 小时后,我观察到存储中缺少数据。Kafka 流或 RocksDb 中状态存储的数据是否有默认保留时间?

我正在使用使用 RocksDb 的自定义状态存储,以便我们可以利用列族功能,我们不能将其与带有 KStreams 的嵌入式 RocksDb 实现一起使用。我已经使用 KeyValueStore 接口实现了自定义存储。还有我自己的 StoreSupplier、StoreBuilder、StoreType 和 StoreWrapper。为应用程序创建了一个变更日志主题,但还没有数据进入它(还没有研究这个问题)。

将数据放入此自定义状态存储并从中获取可查询的状态存储工作正常。但是,我发现在距商店约 72 小时后丢失了数据。我通过获取状态存储目录的大小以及将数据导出到文件并检查条目数来进行检查。

使用 SNAPPY 压缩和 UNIVERSAL 压缩

简单拓扑:

            final StreamsBuilder builder = new StreamsBuilder();
            String storeName = "store-name"
            List<String> cfNames = new ArrayList<>();


            // Hybrid custom store
            final StoreBuilder customStore = new RocksDBColumnFamilyStoreBuilder(storeName, cfNames);
            builder.addStateStore(customStore);

            KStream<String, String> inputstream = builder.stream(
                    inputTopicName,
                    Consumed.with(Serdes.String(), Serdes.String()
                    ));

            inputstream
                    .transform(() -> new CurrentTransformer(storeName), storeName);

            Topology tp = builder.build();

来自自定义商店实现的片段:

 RocksDBColumnFamilyStore(final String name, final String parentDir, List<String> columnFamilyNames) {
     .....  
     ......

        final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig()
                .setBlockCache(cache)
                .setBlockSize(BLOCK_SIZE)
                .setCacheIndexAndFilterBlocks(true)
                .setPinL0FilterAndIndexBlocksInCache(true)
                .setFilterPolicy(filter)
                .setCacheIndexAndFilterBlocksWithHighPriority(true)
                .setPinTopLevelIndexAndFilter(true)
                ;


        cfOptions = new ColumnFamilyOptions()
                .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
                .setCompactionStyle(CompactionStyle.UNIVERSAL)
                .setMaxWriteBufferNumber(MAX_WRITE_BUFFERS)
                .setOptimizeFiltersForHits(true)
                .setLevelCompactionDynamicLevelBytes(true)
                .setTableFormatConfig(tableConfig);


        columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOptions));

        columnFamilyNames.stream().forEach((cfName) -> columnFamilyDescriptors.add(new ColumnFamilyDescriptor(cfName.getBytes(), cfOptions)));
    }

    @SuppressWarnings("unchecked")
    public void openDB(final ProcessorContext context) {
        Options opts = new Options()
                .prepareForBulkLoad();

        options = new DBOptions(opts)
                .setCreateIfMissing(true)
                .setErrorIfExists(false)
                .setInfoLogLevel(InfoLogLevel.INFO_LEVEL)
                .setMaxOpenFiles(-1)
                .setWriteBufferManager(writeBufferManager)
                .setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2))
                .setCreateMissingColumnFamilies(true);

        fOptions = new FlushOptions();
        fOptions.setWaitForFlush(true);

        dbDir = new File(new File(context.stateDir(), parentDir), name);

            try {
               Files.createDirectories(dbDir.getParentFile().toPath());
                db = RocksDB.open(options, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);

                columnFamilyHandles.stream().forEach((handle) -> {
                    try {
                        columnFamilyMap.put(new String(handle.getName()), handle);
                    } catch (RocksDBException e) {
                        throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
                    }
                });
            } catch (RocksDBException e) {
                throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
            }
        open = true;
    }

期望状态存储(RocksDb)将无限期地保留数据,直到手动删除或存储磁盘出现故障。我不知道 Kafka 流已经引入了带有状态存储的 TTl。

4

0 回答 0