0

我的服务正在使用 Kafka 键值存储。我已经在两个处理器中安排了两个任务,这些任务从存储中删除了格式错误和过时的记录。但是,我只有在停止本地实例后才能看到更新的磁盘空间。我检查了,无论任务是否正在运行,从我的 java 进程到 RocksDB 都有很多句柄。

这是我的代码:

public void init(ProcessorContext processorContext) {
store = (KeyValueStore<String, AssistantMessage>) processorContext.getStateStore(storeName);
processorContext.schedule(Duration.ofHours(6), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
  LOGGER.info("Starting execution of task for deletion of assistant group messages.");
  try( KeyValueIterator<String, AssistantMessage> iterator = store.all()) {
    while (iterator.hasNext()) {
      KeyValue<String, AssistantMessage> keyValue = iterator.next();
      if (CommonUtil.isRecordExpired(new EffectiveDatedAssistantMessage(keyValue.value))) {
        store.delete(keyValue.key);
      }
    }
  }
  LOGGER.info("Finished execution of task for deletion of assistant group messages.");
});

}

其他处理器的init方法中的代码几乎相同。我正在使用默认的 RocksDB 配置。任何帮助,将不胜感激。

4

0 回答 0