我们的系统中有一个场景,其中 kafka 主题 XYZ 用户详细信息由其他一些生产应用程序 A(不同的系统)发布,而我的应用程序 B 正在从该主题消费。
要求是应用程序 B 需要在 A 将其放入 kafka 主题 XYZ 后 45 分钟(或任何可配置时间)消费该事件(此延迟的原因是某些系统 C 的另一个 REST api 需要根据此用户详细信息触发特定用户的事件,以确认它是否为该用户设置了一些标志,并且可以在 45 分钟持续时间内的任何时间设置该标志,尽管如果 C 没有能力发布到 kafka 或通知我们,它可能已经解决以任何方式)。
我们的应用程序 B 是在 spring 中编写的。
我尝试的解决方案是从 Kafka 获取事件并检查队列中第一个事件的时间戳,如果该事件已经是 45 分钟,则处理它,或者如果它少于 45 分钟,则暂停轮询 kafka 容器以获取该数量使用MessageListnerContainer pause()方法达到 45 分钟的时间。像下面的东西 -
@KafkaListener(id = "delayed_listener", topics = "test_topic", groupId = "test_group")
public void delayedConsumer(@Payload String message,
Acknowledgment acknowledgment) {
UserDataEvent userDataEvent = null;
try {
userDataEvent = this.mapper.readValue(message, TopicRequest.class);
} catch (JsonProcessingException e) {
logger.error("error while parsing message");
}
MessageListenerContainer delayedContainer = this.kafkaListenerEndpointRegistry.getListenerContainer("delayed_listener");
if (userDataEvent.getPublishTime() > 45 minutes) // this will be some configured value
{
long sleepTimeForPolling = userDataEvent.getPublishTime() - System.currentTimeMillis();
// give negative ack to put already polled messages back to kafka topic
acknowledgment.nack(1000);
// pause container, and later resume it
delayedContainer.pause();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.schedule(() -> {
delayedContainer.resume();
}, sleepTimeForPolling, TimeUnit.MILLISECONDS);
return;
}
// if message was already 45 minutes old then process it
this.service.processMessage(userDataEvent);
acknowledgment.acknowledge();
}
虽然它适用于单个分区,但我不确定这是否是正确的方法,对此有何评论?我还看到多个分区会导致问题,因为上面的暂停方法调用将暂停整个容器,如果其中一个分区有旧消息,如果容器因其他分区中的新消息而暂停,则不会消耗它。我可以以某种方式在分区级别使用此暂停逻辑吗?
任何更好/推荐的解决方案可以在一定数量的可配置时间后实现这种延迟处理,我可以在这种情况下采用而不是做我上面所做的事情?