我正在尝试在我的一个 Kafka 处理器中安排一项任务,以从本地 KeyValueStore (RocksDB) 中删除记录。即使到目前为止没有出现异常,也没有任何记录被删除。这是我的代码:
processorContext.schedule(Duration.ofHours(6), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
store.all().forEachRemaining(keyValue -> {
// CommonUtil.removeOutdatedMessage(store,keyValue.key,keyValue.value.getSentAt());
LocalDateTime sentAt = null;
try {
sentAt = LocalDateTime.parse(keyValue.value.getSentAt(), DateTimeFormatter.ISO_OFFSET_DATE_TIME);
} catch (DateTimeException ex) {
LOGGER.warn("Parsing of date {} failed for message with: {}", keyValue.value.getSentAt(), ex);
}
if (sentAt == null) {
store.delete(keyValue.key);
} else {
boolean isExpired = sentAt.isBefore(LocalDateTime.now().minusDays(Constants.MESSAGE_EXPIRATION_LIMIT));
if (isExpired) {
store.delete(keyValue.key);
}
}
});
});
}