0

我需要在我开发的 Flume 自定义源中收听 Rabbit Queue。这个要求在 Flume 中可能看起来很尴尬。但这就是它的需要。为了简单起见,我使用 Spring AMQP 来监听队列,我只是无法理解如何在 Flume 生命周期的 Start() 方法中调用 OnMessage() 方法,以便将消息发布到 Flume 通道上。我已经查看了 Spring MessageListenerAdapter 概念,但我找不到任何示例来实现相同的概念。

4

1 回答 1

1

onMessage()MessageListener模式的一部分。它是一些活动组件,由外部系统(从大高度)启动。并且每次都通过该远程命令起作用,因此您不能将其用作passive由用户调用启动的组件。

由于您从另一侧具有“Flume 生命周期 Start()”并且从另一侧SimpleMessageListenerContainer具有相同的,我想说您必须将它们的生命周期关联起来才能协同工作。

从这里你应该提供SimpleMessageListenerContainer一些内联MessageListener实现,它调用所需的方法来“发布到 Flume 通道”。

高温高压

更新

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
....
container.setMessageListener(new MessageListener() {

   public void onMessage(Message message) {
       sendMessageToFlumeChannel(message);
   }

});

其中sendMessageToFlumeChannel是控股类的方法。

当然它可以是任何 POJO 而不是MessageListener实现,但主要目标是将侦听器结果委托给某个方法。

于 2014-08-07T10:42:09.100 回答