1

我想创建一个异步读取kafka消息并使用队列通道来累积要处理的消息数量的流,并且只有在处理这些消息(例如50条消息)结束时,它才能处理另外50条或它释放该队列中的空间。我尝试使用从kafka委托读取另一个流的流,该流带有带有PollerMetadata(Pollers.fixedDelay(500).maxMessagesPerPoll(50))的QueueChannel,但是轮询器使用单个线程来读取那里的消息,我无法并行处理50 条消息,如果我在轮询器中放置一个执行程序,它将像普通执行程序一样工作,它会累积消息,并且在我有一个新线程可供他从 kafka 获取另一条消息之前,它不会挂在 50 条消息。

目标是并行处理多达 50 条 kafka 消息,但他只会在此队列释放时再次在 kafka(consumer.pool)中读取,但他正在从 kafka 中无限读取并在执行程序的限制数量内处理或poller,我可以使用带有kafka的spring集成流来实现这个目标吗?

每个消费者主题只有这个配置就够了吗?日志总是打印相同的线程:[ntainer#0-1-C-1]即使我设置了 10 来表示并发

块引用

> Kafka.messageDrivenChannelAdapter(consumerFactory,
> 主题).configureListenerContainer { kafkaMessageListenerContainer ->  
> kafkaMessageListenerContainer.concurrency(并发)               
> kafkaMessageListenerContainer.ackMode(ContainerProperties.AckMode.RECORD)
> }
> .errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)

4

1 回答 1

1

您永远不应该使用队列通道或使用 Kafka 执行任何异步处理。跟踪主题/分区内的偏移量太难了。您将面临丢失消息的风险。

相反,为了增加并发,增加主题中的分区数量,并设置监听器容器并发来获取你需要的消费者数量(例如50)。

您通常应该拥有比消费者更多的分区,但您至少需要同样多的分区,因为组中只有一个消费者可以从一个分区消费。

于 2020-04-09T22:42:25.677 回答