我有一个假脱机目录,其中存在所有 json 文件,传入的文件将每秒添加到此目录,我必须反序列化传入的 json 文件并获取所需字段并将其附加到 HDFS 目录中。
我所做的是创建了一个水槽 conf 文件,其中将 spooling 目录中的文件作为源,并使用 1 Sink 将 json 文件直接放入 HDFS。
我必须在 Sink 之前将此 json 转换为结构格式并将其放入 HDFS。最重要的是,它不是推特数据。而且我必须实现纯粹的 Flume。
我使用以下水槽配置来完成工作:
agent_slave_1.channels.fileChannel1_1.type = file
agent_slave_1.channels.fileChannel1_1.capacity = 200000
agent_slave_1.channels.fileChannel1_1.transactionCapacity = 1000
agent_slave_1.sources.source1_1.type = spooldir
agent_slave_1.sources.source1_1.spoolDir = /home/cloudera/runs/
agent_slave_1.sources.source1_1.fileHeader = false
agent_slave_1.sources.source1_1.fileSuffix = .COMPLETED
agent_slave_1.sinks.hdfs-sink1_1.type = hdfs
agent_slave_1.sinks.hdfs-sink1_1.hdfs.path =hdfs://localhost.localdomain:8020/user/cloudera/runs_scored/
agent_slave_1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollSize = 268435456
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollInterval = 0
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollCount = 50000000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text
agent_slave_1.sinks.hdfs-sink1_1.hdfsfileType = DataStream
agent_slave_1.sources.source1_1.channels = fileChannel1_1
agent_slave_1.sinks.hdfs-sink1_1.channel = fileChannel1_1
agent_slave_1.sinks = hdfs-sink1_1
agent_slave_1.sources = source1_1
agent_slave_1.channels = fileChannel1_1
但我不知道如何使用反序列化器。
有人可以帮助我了解如何反序列化传入的 Json 文件吗?如果我需要用java编写任何代码,请帮助我,我需要使用什么接口?如果可能的话,给一些提示。