我是使用 Apache Flume 的新手,我很难理解它是如何工作的。为了解释我的问题,所以我解释了我的需要和我做了什么。
我想在 csv 文件目录(这些文件每 5 分钟构建一次)和 HDFS 集群之间配置一个流。
我确定“假脱机目录”源和 HDFS 接收器是我需要的。那就是给我这个flume.conf文件
agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = hdfsSink
# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = spooldir
agent.sources.seqGenSrc.spoolDir = /home/user/data
# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://localhost/Flume/data
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.writeFormat=Text
#Specify the channel the sink should use
agent.sinks.hdfsSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100
结果是输入文件在我的本地文件系统上被重命名为“.complete”,并且数据被上传到 HDFS 上,新名称我猜是唯一的,由 Flume 生成。
这几乎是我所需要的。
但在上传之前,我想做一些文件特定的操作(删除标题,转义逗号..)。我不知道该怎么做,我考虑使用拦截器。但是,当数据在水槽中时,它会在事件中转换并流式传输。在他的点上,没有文件的知识。
否则,文件名中会写入原始时间事件,所以我希望这个时间与我的事件相关联,而不是与当前日期相关联。
我还想将原始文件名保留在 hdfs 中(其中有一些有用的信息)。
有人有什么建议可以帮助我吗?