我正在尝试在 spring 集成中实现自定义入站通道适配器以使用来自apache kafka的消息。基于spring集成示例,我发现我需要创建一个实现MessageSource接口的类并实现receive()方法,该方法将从kafka返回消费的Message。但是基于kafka 中的消费者示例, KafkaStream 中的消息迭代器由 BlockingQueue 支持。因此,如果队列中没有消息,则线程将被阻塞。
那么实现 receive() 方法的最佳方法是什么,因为该方法可能会阻塞,直到有东西要消耗..?
在更一般的意义上,我们如何为流式消息源实现自定义入站通道,直到有东西准备好消费......?