0

我正在尝试在 spring 集成中实现自定义入站通道适配器以使用来自apache kafka的消息。基于spring集成示例,我发现我需要创建一个实现MessageSource接口的类并实现receive()方法,该方法将从kafka返回消费的Message。但是基于kafka 中的消费者示例, KafkaStream 中的消息迭代器由 BlockingQueue 支持。因此,如果队列中没有消息,则线程将被阻塞。

那么实现 receive() 方法的最佳方法是什么,因为该方法可能会阻塞,直到有东西要消耗..?

在更一般的意义上,我们如何为流式消息源实现自定义入站通道,直到有东西准备好消费......?

4

1 回答 1

4

receive() 方法可以阻塞(只要底层操作正确响应中断的线程),并且从入站通道适配器的角度来看,根据底层源的期望,最好使用固定的-延迟触发。例如,“长轮询”可以在提供非常小的延迟值时模拟事件驱动的行为。

我们的 JMS 轮询 MessageSource 实现中也有类似的情况。在那里,底层行为由 JmsTemplate 的 receive() 方法之一处理。JmsTemplate 本身允许配置超时值。这意味着,例如,您可以选择最多阻塞 5 秒,然后在每个阻塞接收调用之间有一个非常短的延迟触发。或者,您可以指定无限期接收超时。该决定最终取决于对底层资源、消息吞吐量等的期望。

另外,我想让你知道我们正在自己探索 Kafka 适配器。也许您想在 spring-integration-extensions 存储库中进行协作?

问候,马克

于 2013-02-01T17:12:24.477 回答