3

我有一个 http 服务器写入日志文件,然后我使用 Flume 将其加载到 HDFS 首先我想根据我在标题或正文中的数据过滤数据。我读到我可以使用带有正则表达式的拦截器来做到这一点,有人可以准确解释我需要做什么吗?我是否需要编写覆盖 Flume 代码的 Java 代码?

另外我想获取数据并根据标头将其发送到不同的接收器(即 source=1 到 sink1 和 source=2 到 sink2)这是如何完成的?

谢谢你,

西蒙

4

2 回答 2

11

您无需编写 Java 代码来过滤事件。使用Regex Filtering Interceptor过滤正文匹配某个正则表达式的事件:

agent.sources.logs_source.interceptors = regex_filter_interceptor
agent.sources.logs_source.interceptors.regex_filter_interceptor.type = regex_filter
agent.sources.logs_source.interceptors.regex_filter_interceptor.regex = <your regex>
agent.sources.logs_source.interceptors.regex_filter_interceptor.excludeEvents = true

要基于标头路由事件,请使用Multiplexing Channel Selector

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

此处,标题为“state”="CZ" 的事件进入通道“c1”,“state”="US" - 进入“c2”和“c3”,所有其他事件 - 进入“c4”。

这样,您还可以按标头过滤事件 - 只需将特定标头值路由到指向Null Sink的通道。

于 2013-07-22T10:48:31.880 回答
0

您可以使用水槽通道选择器将事件简单地路由到不同的目的地。或者您可以将多个 Flume 代理链接在一起以实现复杂的路由功能。但是链式水槽代理将变得有点难以维护(资源使用和水槽拓扑)。你可以看看flume-ng router sink,它可能会提供一些你想要的功能。

首先,通过flume拦截器在事件头中添加特定字段

a1.sources = r1 r2
a1.channels = c1 c2
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
a1.sources.r2.channels =  c2
a1.sources.r2.type = seq
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = datacenter
a1.sources.r2.interceptors.i2.value = BERKELEY

然后,您可以设置您的水槽通道选择器,如:

a2.sources = r2
a2.sources.channels = c1 c2 c3 c4
a2.sources.r2.selector.type = multiplexing
a2.sources.r2.selector.header = datacenter
a2.sources.r2.selector.mapping.NEW_YORK = c1
a2.sources.r2.selector.mapping.BERKELEY= c2 c3
a2.sources.r2.selector.default = c4

或者,您可以设置 avro-router 接收器,如:

agent.sinks.routerSink.type = com.datums.stream.AvroRouterSink
agent.sinks.routerSink.hostname = test_host
agent.sinks.routerSink.port = 34541
agent.sinks.routerSink.channel = memoryChannel

# Set sink name
agent.sinks.routerSink.component.name = AvroRouterSink

# Set header name for routing
agent.sinks.routerSink.condition = datacenter

# Set routing conditions
agent.sinks.routerSink.conditions = east,west
agent.sinks.routerSink.conditions.east.if = ^NEW_YORK
agent.sinks.routerSink.conditions.east.then.hostname = east_host
agent.sinks.routerSink.conditions.east.then.port = 34542
agent.sinks.routerSink.conditions.west.if = ^BERKELEY
agent.sinks.routerSink.conditions.west.then.hostname = west_host
agent.sinks.routerSink.conditions.west.then.port = 34543
于 2014-10-06T01:49:57.673 回答