2

由于源和接收器之间的速度差距,我正在测试水槽以将数据加载到 hHase 并考虑使用水槽的选择器和接收器进行并行数据加载。

所以,我想用水槽做的是

  1. 使用拦截器的 regex_extractor 类型创建事件的标头

  2. 使用选择器的多路复用类型将带有标题的事件多路复用到两个以上的通道

    在一个源通道汇。

并尝试如下配置。

    agent.sources = tailsrc
    agent.channels = mem1 mem2
    代理.sinks = std1 std2
    agent.sources.tailsrc.type = 执行
    agent.sources.tailsrc.command = tail -F /home/flumeuser/test/in.txt
    agent.sources.tailsrc.batchSize = 1
    
    agent.sources.tailsrc.interceptors = i1
    agent.sources.tailsrc.interceptors.i1.type = regex_extractor
    agent.sources.tailsrc.interceptors.i1.regex = ^(\\d)
    agent.sources.tailsrc.interceptors.i1.serializers = t1
    agent.sources.tailsrc.interceptors.i1.serializers.t1.name = type
    
    agent.sources.tailsrc.selector.type = 多路复用
    agent.sources.tailsrc.selector.header = 类型
    agent.sources.tailsrc.selector.mapping.1 = mem1
    agent.sources.tailsrc.selector.mapping.2 = mem2
    
    agent.sinks.std1.type = file_roll
    agent.sinks.std1.channel = mem1
    agent.sinks.std1.batchSize = 1
    agent.sinks.std1.sink.directory = /var/log/flumeout/1
    agent.sinks.std1.rollInterval = 0
    
    agent.sinks.std2.type = file_roll
    agent.sinks.std2.channel = mem2
    agent.sinks.std2.batchSize = 1
    agent.sinks.std2.sink.directory = /var/log/flumeout/2
    agent.sinks.std2.rollInterval = 0
    
    agent.channels.mem1.type = 内存
    agent.channels.mem1.capacity = 100
    
    agent.channels.mem2.type = 内存
    agent.channels.mem2.capacity = 100

但是,它不起作用!

删除选择器零件时,Flume的日志中会有一些拦截器调试消息。but when selector and interceptor are together, there are nothing.

是否有任何错误的表达或我错过了什么?

谢谢阅读。:)

4

1 回答 1

1

我找到了。

在水槽日志中,有如下警告消息。

    2013-10-10 16:34:20,514 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:571)] 由于配置失败而删除了 tailsrc零件!

所以我附在下面一行

    agent.sources.tailsrc.channels = mem1 mem2

然后它起作用了!!!!

于 2013-10-11T01:53:47.873 回答