1

我有一个 Apache 访问日志文件,其中包含一些数据并且还在不断增加。我想使用 Apache Spark Streaming API 分析这些数据。

Spark 对我来说是新的,我创建了一个程序,在其中我使用 jssc.textFileStream(directory)函数来获取日志数据。但它不符合我的要求。

请向我建议一些使用 spark 分析该日志文件的方法。

这是我的代码。

SparkConf conf = new SparkConf()
                .setMaster("spark://192.168.1.9:7077")
                .setAppName("log streaming")
                .setSparkHome("/usr/local/spark")
                .setJars(new String[] { "target/sparkstreamingdemo-0.0.1.jar" });
        StreamingContext ssc = new StreamingContext(conf, new Duration(5000));
        DStream<String> filerdd = ssc.textFileStream("/home/user/logs");
        filerdd.print();
        ssc.start();
        ssc.awaitTermination();

此代码不会从现有文件返回任何数据。这仅在我创建新文件时有效,但是当我更新该新文件时,程序再次不会返回更新的数据。

4

1 回答 1

3

如果文件被实时修改,您可以使用来自 Apache Commons IO 的Tailer 。这是最简单的示例:

     public void readLogs(File f, long delay) {
        TailerListener listener = new MyTailerListener();
        Tailer tailer = new Tailer(f, listener, delay);

        // stupid executor impl. for demo purposes
        Executor executor = new Executor() {
            public void execute(Runnable command) {
                command.run();
             }
        };
        executor.execute(tailer);       
    }

    public class MyTailerListener extends TailerListenerAdapter {
        public void handle(String line) {
            System.out.println(line);
        }
    }

上面的代码可以用作Apache Flume的日志阅读器并用作源代码。然后您需要配置 Flume sink 将收集到的日志重定向到 Spark 流,并应用 Spark 分析来自 Flume 流的数据 ( http://spark.apache.org/docs/latest/streaming-flume-integration.html )

这篇文章中有关 Flume 设置的更多详细信息: real time log processing using apache spark streaming

于 2015-02-24T02:02:32.287 回答