我想创建一个异步读取kafka消息并使用队列通道来累积要处理的消息数量的流,并且只有在处理这些消息(例如50条消息)结束时,它才能处理另外50条或它释放该队列中的空间。我尝试使用从kafka委托读取另一个流的流,该流带有带有PollerMetadata(Pollers.fixedDelay(500).maxMessagesPerPoll(50))的QueueChannel,但是轮询器使用单个线程来读取那里的消息,我无法并行处理50 条消息,如果我在轮询器中放置一个执行程序,它将像普通执行程序一样工作,它会累积消息,并且在我有一个新线程可供他从 kafka 获取另一条消息之前,它不会挂在 50 条消息。
目标是并行处理多达 50 条 kafka 消息,但他只会在此队列释放时再次在 kafka(consumer.pool)中读取,但他正在从 kafka 中无限读取并在执行程序的限制数量内处理或poller,我可以使用带有kafka的spring集成流来实现这个目标吗?
每个消费者主题只有这个配置就够了吗?日志总是打印相同的线程:[ntainer#0-1-C-1]即使我设置了 10 来表示并发
块引用
> Kafka.messageDrivenChannelAdapter(consumerFactory,
> 主题).configureListenerContainer { kafkaMessageListenerContainer ->
> kafkaMessageListenerContainer.concurrency(并发)
> kafkaMessageListenerContainer.ackMode(ContainerProperties.AckMode.RECORD)
> }
> .errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)