0

我正在尝试在我的一个 Kafka 处理器中安排一项任务,以从本地 KeyValueStore (RocksDB) 中删除记录。即使到目前为止没有出现异常,也没有任何记录被删除。这是我的代码:

 processorContext.schedule(Duration.ofHours(6), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
  store.all().forEachRemaining(keyValue -> {
    // CommonUtil.removeOutdatedMessage(store,keyValue.key,keyValue.value.getSentAt());
    LocalDateTime sentAt = null;
    try {
      sentAt = LocalDateTime.parse(keyValue.value.getSentAt(), DateTimeFormatter.ISO_OFFSET_DATE_TIME);
    } catch (DateTimeException ex) {
      LOGGER.warn("Parsing of date {} failed for message with: {}", keyValue.value.getSentAt(), ex);
    }

    if (sentAt == null) {
      store.delete(keyValue.key);
    } else {
      boolean isExpired = sentAt.isBefore(LocalDateTime.now().minusDays(Constants.MESSAGE_EXPIRATION_LIMIT));
      if (isExpired) {
        store.delete(keyValue.key);
      }
    }
  });
}); 

}

4

1 回答 1

0

您尚未提供有关您正在解析的日期值的任何详细信息。另外,您是否看到日期解析失败的日志消息?多一点细节肯定会有所帮助。

根据您共享的代码片段,我假设sentAt variable is being set as nonNull由于异常或任何原因,并且isExpired as false.

因此,您看不到任何删除发生。放置调试点以找出代码中的对象流。

这是您的 Java 代码中的问题,而不是 Kafka 状态存储中的问题。

于 2021-02-09T22:25:37.890 回答