我正在尝试在 https://github.com/TheHydroImpulse/storm-kafka-starter编译和运行storm-kafka-starter项目
KafkaTopology 的主要功能如下所示:
public class KafkaTopology {
public static void main(String[] args) throws Exception {
List<String> hosts = new ArrayList<String>();
hosts.add("localhost");
SpoutConfig kafkaConf = new SpoutConfig(StaticHosts.fromHostString(hosts,1),
"test-topic","/kafkastorm","discovery");
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConf.forceStartOffsetTime(-2);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", kafkaSpout, 2);
builder.setBolt("printer", new PrinterBolt()).shuffleGrouping("spout");
Config config = new Config();
config.setDebug(true);
if(args!=null && args.length > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
}
else {
config.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
jar 使用 maven 编译。但是在运行拓扑时,我得到了错误:
Exception in thread "main" java.lang.NoClassDefFoundError:
storm/kafka/KafkaConfig$BrokerHosts
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2451)
at java.lang.Class.getMethod0(Class.java:2694)
at java.lang.Class.getMethod(Class.java:1622)
at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: storm.kafka.KafkaConfig$BrokerHosts
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
... 6 more
本地存储库在storm-kafka jar 中确实有BrokerHosts,我已经在我的java 文件中导入了KafkaConfig 库。我无法弄清楚错误的原因。任何建议,将不胜感激。