27

我正在尝试使用 Twitter 作为源执行 Spark Streaming 示例,如下所示:

public static void main (String.. args) {

    SparkConf conf = new SparkConf().setAppName("Spark_Streaming_Twitter").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);       
        JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(2));      
        JavaSQLContext sqlCtx = new JavaSQLContext(sc);     


        String[] filters = new String[] {"soccer"};

        JavaReceiverInputDStream<Status> receiverStream = TwitterUtils.createStream(jssc,filters);



         jssc.start();
         jssc.awaitTermination();

}

但我收到以下异常

Exception in thread "main" java.lang.AssertionError: assertion failed: No output streams registered, so nothing to execute
    at scala.Predef$.assert(Predef.scala:179)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:158)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:416)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:437)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:501)
    at org.learning.spark.TwitterStreamSpark.main(TwitterStreamSpark.java:53)

任何建议如何解决这个问题?

4

3 回答 3

45

当调用输出运算符时,它会触发流的计算。

如果 DStream 上没有输出运算符,则不会调用任何计算。基本上你需要在流上调用以下任何方法

print()
foreachRDD(func)
saveAsObjectFiles(prefix, [suffix])
saveAsTextFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])

http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations

如果需要,您还可以先应用任何转换,然后也可以输出函数。

于 2014-07-04T10:24:28.733 回答
3

线程“主”java.lang.AssertionError 中的异常:断言失败:未注册输出流,因此没有可执行的操作

TL;DR使用可用的输出运算符之一,例如print, saveAsTextFilesor foreachRDD(或较少使用的saveAsObjectFilesor saveAsHadoopFiles)。

换句话说,您必须在代码的以下几行之间使用输出运算符:

JavaReceiverInputDStream<Status> receiverStream = TwitterUtils.createStream(jssc,filters);
// --> The output operator here <--
jssc.start();

引用 Spark 官方文档在 DStreams 上的输出操作(突出显示我的):

输出操作允许 DStream 的数据被推送到外部系统,如数据库或文件系统。由于输出操作实际上允许转换后的数据被外部系统使用,它们触发了所有 DStream 转换的实际执行(类似于 RDD 的操作)

关键是,如果没有输出运算符,您将“没有注册输出流,因此没有可执行的操作”

正如一位评论者所注意到的,您必须在开始之前使用输出转换,例如printor 。foreachRDDStreamingContext


在内部,每当您使用可用的输出运算符之一(例如printor foreach)时,DStreamGraph都会请求添加输出流

您可以在创建新的 ForEachDStream 并随后注册时找到注册(这正是将其添加为输出流)。

于 2017-12-25T21:01:41.817 回答
1

它也 -错误地- 未能指责这个问题,但真正的原因是来自流输入的滑动窗口持续时间和 RDD 时间窗口之间 的非倍数。它只记录一个警告:你修复它,上下文停止失败:D

于 2016-07-14T16:47:24.913 回答