1

我们的系统中有一个场景,其中 kafka 主题 XYZ 用户详细信息由其他一些生产应用程序 A(不同的系统)发布,而我的应用程序 B 正在从该主题消费。

要求是应用程序 B 需要在 A 将其放入 kafka 主题 XYZ 后 45 分钟(或任何可配置时间)消费该事件(此延迟的原因是某些系统 C 的另一个 REST api 需要根据此用户详细信息触发特定用户的事件,以确认它是否为该用户设置了一些标志,并且可以在 45 分钟持续时间内的任何时间设置该标志,尽管如果 C 没有能力发布到 kafka 或通知我们,它可能已经解决以任何方式)。

我们的应用程序 B 是在 spring 中编写的。

我尝试的解决方案是从 Kafka 获取事件并检查队列中第一个事件的时间戳,如果该事件已经是 45 分钟,则处理它,或者如果它少于 45 分钟,则暂停轮询 kafka 容器以获取该数量使用MessageListnerContainer pause()方法达到 45 分钟的时间。像下面的东西 -

@KafkaListener(id = "delayed_listener", topics = "test_topic", groupId = "test_group")
        public void delayedConsumer(@Payload  String message,
                                    Acknowledgment acknowledgment) {

            UserDataEvent userDataEvent = null;
            try {
                 userDataEvent = this.mapper.readValue(message, TopicRequest.class);
            } catch (JsonProcessingException e) {
                logger.error("error while parsing message");
            }
            MessageListenerContainer delayedContainer = this.kafkaListenerEndpointRegistry.getListenerContainer("delayed_listener");
            if (userDataEvent.getPublishTime() > 45 minutes) // this will be some configured value
 {
                long sleepTimeForPolling = userDataEvent.getPublishTime() - System.currentTimeMillis();
                // give negative ack to put already polled messages back to kafka topic
                acknowledgment.nack(1000);
                // pause container, and later resume it  
                delayedContainer.pause();
                ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
                scheduledExecutorService.schedule(() -> {
                    delayedContainer.resume();
                }, sleepTimeForPolling, TimeUnit.MILLISECONDS);
                return;
            }
            // if message was already 45 minutes old then process it
            this.service.processMessage(userDataEvent);
            acknowledgment.acknowledge();
        }

虽然它适用于单个分区,但我不确定这是否是正确的方法,对此有何评论?我还看到多个分区会导致问题,因为上面的暂停方法调用将暂停整个容器,如果其中一个分区有旧消息,如果容器因其他分区中的新消息而暂停,则不会消耗它。我可以以某种方式在分区级别使用此暂停逻辑吗?

任何更好/推荐的解决方案可以在一定数量的可配置时间后实现这种延迟处理,我可以在这种情况下采用而不是做我上面所做的事情?

4

1 回答 1

2

Kafka 并不是真正为此类场景设计的。

我可以看到该技术有效的一种方法是将容器并发设置为与主题中的分区数相同,以便每个分区由不同线程上的不同使用者处理;然后暂停/恢复单个Consumer<?, ?>s 而不是整个容器。

为此,添加Consumer<?, ?>作为附加参数;要恢复消费者,idleEventInterval请在事件侦听器 ( ListenerContainerIdleEvent) 中设置并检查计时器。这Consumer<?, ?>是事件的属性,因此您可以resume()在那里调用。

于 2020-02-04T14:32:31.430 回答