0

我正在使用 HTTP-Source 将 JSON 文件放入 HDFS(单节点 SANDBOX)。

该文件是在正确的目录中创建的,但是没有任何内容附加到该文件中。在我开始调试 HTTP 源之前,你能验证我的 flume.conf 吗?

#################################################################
# Name the components on this agent
#################################################################

hdfs-agent.sources = httpsource
hdfs-agent.sinks = hdfssink
hdfs-agent.channels = channel1

#################################################################
# Describe source
#################################################################

# Source node
hdfs-agent.sources.httpsource.type = http 
hdfs-agent.sources.httpsource.port = 5140
hdfs-agent.sources.httpsource.handler = org.apache.flume.source.http.JSONHandler

#################################################################
# Describe Sink
#################################################################

# Sink hdfs
hdfs-agent.sinks.hdfssink.type = hdfs
hdfs-agent.sinks.hdfssink.hdfs.path = hdfs://sandbox:8020/user/flume/node
hdfs-agent.sinks.hdfssink.hdfs.fileType = DataStream
hdfs-agent.sinks.hdfssink.hdfs.batchSize = 1
hdfs-agent.sinks.hdfssink.hdfs.rollSize = 0
hdfs-agent.sinks.hdfssink.hdfs.rollCount = 0

#################################################################
# Describe channel
#################################################################

# Channel memory
hdfs-agent.channels.channel1.type = memory
hdfs-agent.channels.channel1.capacity = 1000
hdfs-agent.channels.channel1.transactionCapacity = 100


#################################################################
# Bind the source and sink to the channel
#################################################################

hdfs-agent.sources.httpsource.channels = channel1
hdfs-agent.sinks.hdfssink.channel = channel1

我目前只是尝试从小处着手来测试它:

[{"text": "Hi Flume this Node"}]

所以我认为我的 batchSize/rollSize/rollCount 可能是这里的问题?

4

2 回答 2

2

batchSize、rollSize、rollCount 值很好。将 rollSize 和 rollCount 设置为 0 将禁用文件滚动功能。

hdfs-agent.sources.httpsource.type 应该设置为org.apache.flume.source.http.HTTPSource

发送到http源的数据格式应该是

[{"headers" : {"a":"b", "c":"d"},"body": "random_body"}, {"headers" : {"e": "f"},"body": "random_body2"}].

我使用您使用的数据([{“text”:“Hi Flume this Node”}])测试了发送。由于没有“body”属性,因此没有任何内容附加到我的文件中。但是当我发布以下内容时,数据被附加到我的文件中。

 curl -X POST -H 'Content-Type: application/json; charset=UTF-8' -d '[{  "headers" : {           "timestamp" : "434324343", "host" :"random_host.example.com", "field1" : "val1"            },  "body" : "random_body"  }]' http://localhost:5140.

希望这可以帮助

于 2015-03-24T16:02:36.233 回答
1

就像 arathim 指出的那样, org.apache.flume.source.http.JSONHandler 需要 Flume 事件格式。如果要假脱机您自己的 JSON,则需要创建自己的处理程序。这是一个接受任何 JSON 的处理程序示例:

public class GenericJSONInputHandler implements HTTPSourceHandler {

    protected static final String TIMESTAMP = "timestamp";
    private static final Logger LOG = LoggerFactory.getLogger(GenericJSONInputHandler.class);
    protected static final String TYPE = "type";


    public GenericJSONInputHandler() {
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public List<Event> getEvents(HttpServletRequest request) throws Exception {
        BufferedReader reader = request.getReader();
        String charset = request.getCharacterEncoding();
        // UTF-8 is default for JSON. If no charset is specified, UTF-8 is to
        // be assumed.
        if (charset == null) {
            LOG.debug("Charset is null, default charset of UTF-8 should be used.");
        }

        List<Event> eventList = new ArrayList<Event>(0);
        try {
            String json = reader.readLine();
            LOG.debug("Received line with size " + json.length());
            while (json != null) {
                List<Event> e = createEvents(json);
                if (e !=null) {
                    eventList.addAll(e);
                }
                json = reader.readLine();
            }
        }
        catch (Exception ex) {
            throw new HTTPBadRequestException("Request has invalid JSON Syntax.", ex);
        }

        return eventList;
    }

    protected List<Event> createEvents(String json) {
        try {
            if (isValidJSON(json)) {
                Map<String, String> headers = new HashMap<>();
                headers.put(TIMESTAMP, String.valueOf(System.currentTimeMillis()));
                headers.put(TYPE, "default");
                return Arrays.asList(EventBuilder.withBody(json.getBytes(), headers));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    public boolean isValidJSON(final String json) {
        boolean valid = false;
        try {
            final JsonParser parser = new ObjectMapper().getFactory()
                .createParser(json);
            while (parser.nextToken() != null) {
            }
            valid = true;
        }
        catch (JsonParseException jpe) {
            jpe.printStackTrace();
        }
        catch (IOException ioe) {
            ioe.printStackTrace();
        }

        return valid;
    }

    @Override
    public void configure(Context context) {
    }

}
于 2015-03-24T17:53:57.940 回答