我正在关注异常风暴拓扑。
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V
at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:41) ~[stormjar.jar:?]
at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:44) ~[stormjar.jar:?]
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:81) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:71) ~[stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:135) ~[stormjar.jar:?]
at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:110) ~[stormjar.jar:?]
at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:71) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__10727$fn__10742$fn__10773.invoke(executor.clj:654) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
POM 配置:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<!-- <version>0.10.0</version> -->
<version>1.2.2</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>log4j-core</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<!-- <version>0.10.0</version> -->
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
我正在使用已弃用的storm-kafka 库。如果这是上述异常的原因,那么让我知道如何使用storm-kafka-client 库创建kafka spout 并将自定义方案传递给它。
谢谢。