我正在尝试从我尝试的 hdfs 文件夹(sparkstreaming)流式传输:
SparkConf sparkConf = new SparkConf()
.setMaster("spark://quickstart.cloudera:7077")
.setAppName("BigData");
JavaStreamingContext ssc =
new JavaStreamingContext(sparkConf, new Duration(2000));
ssc.textFileStream("hdfs://user/cloudera/events/")
为了检查它是否是我做的好文件夹
hadoop fs -ls
返回 JavaDStream 不包含任何数据,我没有收到任何错误。该文件夹已包含一些文件..
有没有其他方法可以检查文件夹路径?还有其他需要检查的吗?
注意:我还尝试从本地文件中读取,然后将其放入流中,但我仍然有一个空输入(这是本次试用的代码)。我检查了我电脑上文件的位置spark-shell .. input.print() 行给出了 java.io.Exception ...
System.out.println( "Create context(es)" );
SparkConf sparkConf = new SparkConf()
.setMaster("spark://quickstart.cloudera:7077")
.setAppName("BigData")
.setSparkHome("/usr/lib/spark")
.setJars(new String[]{"target/standard-to-self-explicit-0.0.1-SNAPSHOT.jar"});
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SparkConf sparkConf2 = new SparkConf()
.setMaster("spark://quickstart.cloudera:7077")
.setAppName("BigData2")
.setJars(new String[]{"target/standard-to-self-explicit-0.0.1-SNAPSHOT.jar"});
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf2, new Duration(2000));
System.out.println( "Set input file" );
JavaRDD<String> inputRDD1 = sc.textFile(inputFile+"1.json");
JavaRDD<String> inputRDD2 = sc.textFile(inputFile+"2.json");
Queue<JavaRDD<String>> inputRDDQueue = new LinkedList<JavaRDD<String>>();
inputRDDQueue.add(inputRDD1);
inputRDDQueue.add(inputRDD2);
System.out.println("debug 1: "+(String)inputRDD1.toDebugString());
//System.out.println("first 1: "+inputRDD1.take(3).toString());
JavaDStream<String> input = ssc.queueStream(inputRDDQueue);
input.print();
ssc.start();
ssc.awaitTermination();
此外,第二个版本的日志显示输入存在问题:
RROR JobScheduler: Error running job streaming job 1416306522000 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 78, quickstart.cloudera): **java.io.IOException: unexpected exception type**
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
谢谢!