1

我是火花流和卡夫卡的新手,我不明白这个运行时异常。我已经设置了 kafka 服务器。

    Exception in thread "JobGenerator" java.lang.NoSuchMethodError: org.apache.spark.streaming.scheduler.InputInfoTracker.reportInfo(Lorg/apache/spark/streaming/Time;Lorg/apache/spark/streaming/scheduler/StreamInputInfo;)V
at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:166)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)

这是我的代码

public class TwitterStreaming {
     // setup kafka : 
     public static final String ZKQuorum = "localhost:2181";
     public static final String ConsumerGroupID = "ingi2145-analytics";
     public static final String ListTopics = "newTweet";
     public static final String ListBrokers = "localhost:9092"; // I'm not sure about ...

    @SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
    // Location of the Spark directory
    String sparkHome = "usr/local/spark";
    // URL of the Spark cluster
    String sparkUrl = "local[4]";
    // Location of the required JAR files
    String jarFile = "target/analytics-1.0.jar";
// Generating spark's streaming context
JavaStreamingContext jssc = new JavaStreamingContext(
  sparkUrl, "Streaming", new Duration(1000), sparkHome, new String[]{jarFile});
// Start kafka stream
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(ListTopics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", ListBrokers);

//JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroupID, mapPartitionsPerTopics);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
    jssc,
    String.class,
    String.class,
    StringDecoder.class,
    StringDecoder.class,
    kafkaParams,
    topicsSet
);

// get the json file :
   JavaDStream<String> json = messages.map(
        new Function<Tuple2<String, String>, String>() {
            public String call(Tuple2<String, String> tuple2) {
              return tuple2._2();
            }
   });

该项目的目的是使用 kafka 队列从 twitter 流中计算出 10 个最佳主题标签。代码在没有 kakfa 的情况下工作。你有什么问题的想法吗?

4

1 回答 1

0

我遇到了同样的问题,这是我使用的 spark 版本。我使用的是 1.5,然后使用了 1.4,最终适合我的版本是 1.6。因此,请确保您使用的 Kafka 版本与 Spark 版本兼容。就我而言,我使用的是带有 spark-1.6.0-bin-hadoop2.3 的 Kafka 版本 2.10-0.10.1.1。

此外,(非常重要)确保您的日志文件中没有出现任何禁止错误。您必须为 spark 使用的文件夹分配适当的安全授权,否则您可能会收到很多与应用程序本身无关但安全设置不正确的错误。

于 2017-02-02T19:00:02.777 回答