16

我正在尝试使用 eclipse(使用 maven conf)和 2 个工作人员执行下面的代码,每个工作人员都有 2 个核心,或者也尝试使用 spark-submit。

public class StreamingWorkCount implements Serializable {

    public static void main(String[] args) {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        JavaStreamingContext jssc = new JavaStreamingContext(
                "spark://192.168.1.19:7077", "JavaWordCount",
                new Duration(1000));
        JavaDStream<String> trainingData = jssc.textFileStream(
                "/home/bdi-user/kaushal-drive/spark/data/training").cache();
        trainingData.foreach(new Function<JavaRDD<String>, Void>() {

            public Void call(JavaRDD<String> rdd) throws Exception {
                List<String> output = rdd.collect();
                System.out.println("Sentences Collected from files " + output);
                return null;
            }
        });

        trainingData.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

并记录该代码

15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms:

15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 ms
15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms
15/01/22 21:57:13 INFO SparkContext: Starting job: foreach at StreamingKMean.java:33
15/01/22 21:57:13 INFO DAGScheduler: Job 3 finished: foreach at StreamingKMean.java:33, took 0.000094 s
Sentences Collected from files []
-------------------------------------------
15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms
Time: 1421944033000 ms
-------------------------------------------15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms


15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms
15/01/22 21:57:13 INFO JobScheduler: Total delay: 0.028 s for time 1421944033000 ms (execution: 0.013 s)
15/01/22 21:57:13 INFO MappedRDD: Removing RDD 5 from persistence list
15/01/22 21:57:13 INFO BlockManager: Removing RDD 5
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

问题是,我没有从目录中的文件中获取数据。请帮我。

4

7 回答 7

13

尝试使用另一个目录,然后在作业运行时将这些文件复制到该目录。

于 2015-01-22T19:22:19.577 回答
5

有同样的问题。这是我的代码:

lines = jssc.textFileStream("file:///Users/projects/spark/test/data');

TextFileSTream非常敏感;我最终做的是:

1. Run Spark program
2. touch datafile
3. mv datafile datafile2
4. mv datafile2  /Users/projects/spark/test/data

并且做到了。

于 2015-10-09T05:31:46.233 回答
2

我认为您需要添加该方案,即file://hdfs://在您的路径前面。


撤消对我的评论的编辑,因为:事实上file://hdfs://它需要添加到路径“前面”,因此总路径变为file:///tmp/file.txtor hdfs:///user/data。如果配置中没有设置 NameNode,则后者需要为hdfs://host:port/user/data.

于 2015-01-23T07:51:30.430 回答
0

我一直在挠头好几个小时,对我有用的是

于 2021-01-29T16:04:00.433 回答
0

JavaDoc 建议函数只流式传输新文件。

参考: https://spark.apache.org/docs/1.0.1/api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html#textFileStream(java.lang.String)

创建一个输入流,用于监控与 Hadoop 兼容的文件系统中的新文件并将它们作为文本文件读取(使用 LongWritable 的键、Text 的值和 TextInputFormat 的输入格式)。必须通过将文件从同一文件系统中的另一个位置“移动”它们来将文件写入受监视的目录。以 . 开头的文件名 被忽略。

于 2016-04-25T21:37:14.873 回答
0

textFileStream只能在文件夹中的文件被添加更新时监控文件夹。

如果您只想读取文件,则可以使用SparkContext.textFile.

于 2016-11-29T03:57:02.107 回答
0

您必须考虑到 Spark Streaming 只会读取目录中的新文件,不会读取更新的文件(一旦它们在目录中),而且它们都必须具有相同的格式。

资源

于 2017-03-11T19:31:14.747 回答