我是 Flume 和 Hadoop 的新手。我们正在开发一个 BI 模块,我们可以在其中将来自不同服务器的所有日志存储在 HDFS 中。
为此,我正在使用 Flume。我刚开始尝试。成功创建了一个节点,但现在我愿意设置一个 HTTP 源和一个接收器,它将通过 HTTP 将传入请求写入本地文件。
有什么建议吗?
提前致谢/
希望这可以帮助您入门。我在我的机器上测试它时遇到了一些问题,现在没有时间完全解决它,但我会解决的......
假设你现在已经启动并运行了 Flume,这应该是你的 flume.conf 文件需要使用 HTTP POST 源和本地文件接收器的样子(注意:这是本地文件,而不是 HDFS)
########## NEW AGENT ##########
# flume-ng agent -f /etc/flume/conf/flume.httptest.conf -n httpagent
#
# slagent = SysLogAgent
###############################
httpagent.sources = http-source
httpagent.sinks = local-file-sink
httpagent.channels = ch3
# Define / Configure Source (multiport seems to support newer "stuff")
###############################
httpagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
httpagent.sources.http-source.channels = ch3
httpagent.sources.http-source.port = 81
# Local File Sink
###############################
httpagent.sinks.local-file-sink.type = file_roll
httpagent.sinks.local-file-sink.channel = ch3
httpagent.sinks.local-file-sink.sink.directory = /root/Desktop/http_test
httpagent.sinks.local-file-sink.rollInterval = 5
# Channels
###############################
httpagent.channels.ch3.type = memory
httpagent.channels.ch3.capacity = 1000
使用第二行的命令启动 Flume。根据您的需要调整它(尤其是端口、sink.directory 和 rollInterval)。这是一个非常简单的最低配置文件,还有更多可用选项,请查看 Flume 用户指南。现在,就目前而言,代理启动并运行良好......
这是我没有时间测试的。默认情况下,HTTP 代理接受 JSON 格式的数据。您 - 应该 - 能够通过发送带有如下形式的 cURL 请求来测试此代理:
curl -X POST -H 'Content-Type: application/json; charset=UTF-8' -d '{"username":"xyz","password":"123"}' http://yourdomain.com:81/
-X 将请求设置为 POST,-H 发送标头,-d 发送数据(有效的 json),然后是主机:端口。对我来说,问题是我收到一个错误:
WARN http.HTTPSource: Received bad request from client. org.apache.flume.source.http.HTTPBadRequestException: Request has invalid JSON Syntax.
在我的 Flume 客户端中,JSON 无效?所以有些东西是错误的。弹出错误的事实表明 Flume 源正在接收数据。无论你有什么,只要它的格式有效,它就应该可以工作。
从问题的措辞中很难准确说出您想要什么,但我假设您想使用 HTTP POST 请求将 JSON 发送到 Flume,然后让 Flume 将这些 JSON 事件转储到 HDFS(不是本地文件系统)。如果这就是你想做的,这就是你需要做的。
确保首先在 HDFS 中为 Flume 创建一个目录以将事件发送到。例如,如果您想/user/flume/events
在 HDFS 中发送事件,您可能必须运行以下命令:
$ su - hdfs
$ hdfs dfs -mkdir /user/flume
$ hdfs dfs -mkdir /user/flume/events
$ hdfs dfs -chmod -R 777 /user/flume
$ hdfs dfs -chown -R flume /user/flume
配置 Flume 以使用 HTTP Source 和 HDFS Sink。您需要确保添加主机和时间戳的拦截器,否则您的事件将导致 HDFS 接收器中的异常,因为该接收器在事件标头中需要主机和时间戳。还要确保暴露 Flume HTTPSource 正在侦听的服务器上的端口。
这是适用于 CDH-5.7.0 的 Cloudera Quickstart Docker 容器的示例 Flume 配置
# Please paste flume.conf here. Example: # Sources, channels, and sinks are defined per # agent name, in this case 'tier1'. tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.interceptors = i1 i2 tier1.sources.source1.interceptors.i1.type = host tier1.sources.source1.interceptors.i1.preserveExisting = false tier1.sources.source1.interceptors.i1.hostHeader = host tier1.sources.source1.interceptors.i2.type = timestamp # For each source, channel, and sink, set # standard properties. tier1.sources.source1.type = http tier1.sources.source1.bind = 0.0.0.0 tier1.sources.source1.port = 5140 # JSONHandler is the default for the httpsource # tier1.sources.source1.handler = org.apache.flume.source.http.JSONHandler tier1.sources.source1.channels = channel1 tier1.channels.channel1.type = memory tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.hdfs.path = /user/flume/events/%y-%m-%d/%H%M/%S tier1.sinks.sink1.hdfs.filePrefix = event-file-prefix- tier1.sinks.sink1.hdfs.round = false tier1.sinks.sink1.channel = channel1 # Other properties are specific to each type of # source, channel, or sink. In this case, we # specify the capacity of the memory channel. tier1.channels.channel1.capacity = 1000
有必要创建一个 Flume 客户端,它可以将 JSON 事件以它期望的格式发送到 Flume HTTP(这个客户端可以像curl
请求一样简单)。关于格式最重要的一点是 JSON"body":
键必须有一个 String值。 "body":
不能是 JSON 对象 - 如果是,Gson
FlumeJSONHandler
用于解析 JSONEvents 的库将抛出异常,因为它无法解析 JSON - 它需要一个字符串。
这是您需要的 JSON 格式:
[ { "headers": { "timestamp": "434324343", "host": "localhost", }, "body": "No matter what, this must be a String, not a list or a JSON object", }, { ... following events take the same format as the one above ...} ]
/var/log/flume-ng/
)。要解决此问题,请增加tier1.channels.channel1.capacity
.试试这个 :
curl -X POST -H '内容类型:应用程序/json;charset=UTF-8' -d '[{"username":"xrqwrqwryzas","password":"12124sfsfsfas123"}]' http://yourdomain.com:81/