我是火花流和卡夫卡的新手,我不明白这个运行时异常。我已经设置了 kafka 服务器。
Exception in thread "JobGenerator" java.lang.NoSuchMethodError: org.apache.spark.streaming.scheduler.InputInfoTracker.reportInfo(Lorg/apache/spark/streaming/Time;Lorg/apache/spark/streaming/scheduler/StreamInputInfo;)V
at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:166)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
这是我的代码
public class TwitterStreaming {
// setup kafka :
public static final String ZKQuorum = "localhost:2181";
public static final String ConsumerGroupID = "ingi2145-analytics";
public static final String ListTopics = "newTweet";
public static final String ListBrokers = "localhost:9092"; // I'm not sure about ...
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
// Location of the Spark directory
String sparkHome = "usr/local/spark";
// URL of the Spark cluster
String sparkUrl = "local[4]";
// Location of the required JAR files
String jarFile = "target/analytics-1.0.jar";
// Generating spark's streaming context
JavaStreamingContext jssc = new JavaStreamingContext(
sparkUrl, "Streaming", new Duration(1000), sparkHome, new String[]{jarFile});
// Start kafka stream
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(ListTopics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", ListBrokers);
//JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroupID, mapPartitionsPerTopics);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
// get the json file :
JavaDStream<String> json = messages.map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
该项目的目的是使用 kafka 队列从 twitter 流中计算出 10 个最佳主题标签。代码在没有 kakfa 的情况下工作。你有什么问题的想法吗?