由于源和接收器之间的速度差距,我正在测试水槽以将数据加载到 hHase 并考虑使用水槽的选择器和接收器进行并行数据加载。
所以,我想用水槽做的是
使用拦截器的 regex_extractor 类型创建事件的标头
使用选择器的多路复用类型将带有标题的事件多路复用到两个以上的通道
在一个源通道汇。
并尝试如下配置。
agent.sources = tailsrc
agent.channels = mem1 mem2
代理.sinks = std1 std2
agent.sources.tailsrc.type = 执行
agent.sources.tailsrc.command = tail -F /home/flumeuser/test/in.txt
agent.sources.tailsrc.batchSize = 1
agent.sources.tailsrc.interceptors = i1
agent.sources.tailsrc.interceptors.i1.type = regex_extractor
agent.sources.tailsrc.interceptors.i1.regex = ^(\\d)
agent.sources.tailsrc.interceptors.i1.serializers = t1
agent.sources.tailsrc.interceptors.i1.serializers.t1.name = type
agent.sources.tailsrc.selector.type = 多路复用
agent.sources.tailsrc.selector.header = 类型
agent.sources.tailsrc.selector.mapping.1 = mem1
agent.sources.tailsrc.selector.mapping.2 = mem2
agent.sinks.std1.type = file_roll
agent.sinks.std1.channel = mem1
agent.sinks.std1.batchSize = 1
agent.sinks.std1.sink.directory = /var/log/flumeout/1
agent.sinks.std1.rollInterval = 0
agent.sinks.std2.type = file_roll
agent.sinks.std2.channel = mem2
agent.sinks.std2.batchSize = 1
agent.sinks.std2.sink.directory = /var/log/flumeout/2
agent.sinks.std2.rollInterval = 0
agent.channels.mem1.type = 内存
agent.channels.mem1.capacity = 100
agent.channels.mem2.type = 内存
agent.channels.mem2.capacity = 100
但是,它不起作用!
删除选择器零件时,Flume的日志中会有一些拦截器调试消息。but when selector and interceptor are together, there are nothing.
是否有任何错误的表达或我错过了什么?
谢谢阅读。:)