0

我们计划使用 kafka flume-ng 集成(Flafka),其中 flume 是 kafka 队列的消费者。Flume 代理将接收列出命令及其输出的文件,如下所示:

root@host> [Command1]

[Output1]

root@host> [Command2]

[Output2]

该文件可能包含多个命令,并且命令的输出可能很大。我们需要拦截事件(也就是文件数据),并根据命令将事件拆分成多个事件。然后源将流扇出到多个通道,将每个子事件发送到一个通道(使用多路复用),每个接收器将命令信息存储到相应的 Hive 表中。是否可以使用扇出流将事件拆分为多个事件?或者如果我以其他方式问,我们可以在拦截器中将一个事件拆分为多个事件吗?

我已阅读有关正则表达式提取器拦截器和序列化程序的信息,但不确定它是否对这种情况有任何帮助。

4

3 回答 3

1

如果我理解得很好,您需要将来自 Kafka 队列的原始事件拆分为几个,比如说,子事件。你想知道哪一块 Flume 可以做到这一点。

我认为拦截器不适合这个目的,因为拦截器被“放置”在源和通道之间,它们旨在添加、删除或修改有关 Flume 事件的标头,然后再将其放入通道;同样,他们可以放弃整个事件。但是他们无法根据其他现有事件生成多个事件。

我认为您正在寻找类似于附加到源的处理程序,能够解释从 Kafka 获取的事件并在源输出中生成多个 Flume 事件。这个概念类似于你可以附加到的处理程序HTTPSoure(更多细节在这里)。如果您的源代码可以做到这一点,那么您很可能必须开发自己的自定义处理程序,因为您需要的功能非常具体。

于 2015-03-11T10:56:18.750 回答
1

谢谢frb的回复。

我想将传入事件拆分为一个水槽源到多个子事件并将它们发送到各自的通道。因此拓扑中的第一个水槽节点会将每个子事件(使用多路复用)路由到可以处理此类信息的特定跃点。

根据您的回复,我了解使用拦截器无法完成。您能否分享处理程序的任何示例或文档?

于 2015-03-11T13:14:07.230 回答
0

是的,水槽不能将事件拆分为多个。这是我对这种方法的替代解决方案,以 Kafka 源为例。

首先实现一个扩展Kafka源的源类,替换默认的ChannelProcessor对象。

public class XXXSplitSource extends KafkaSource {

    @Override
    public synchronized ChannelProcessor getChannelProcessor()
    {
        return new XXXYourChannelProcessorProxy(super.getChannelProcessor());
    }
}

然后,在 ChannelProcessor 代理实现中,您可以使用自定义函数拆分事件。

public class XXXYourChannelProcessorProxy  extends ChannelProcessor {
    public ChannelProcessor  m_downstreamChannelProcessor = null;

    public XXXYourChannelProcessorProxy (ChannelSelector selector) {
        super(selector);
    }

    public XXXYourChannelProcessorProxy (ChannelProcessor processor) {
        super(null);
        m_downstreamChannelProcessor = processor;
    }

    @Override
    public void processEventBatch(List<Event> events) {
        List<Event> generatedEvents = YOUR_SPLIT_FUNCTION_HERE(events);
        m_downstreamChannelProcessor.processEventBatch(generatedEvents);    
    }
}
于 2015-12-21T03:38:22.503 回答