1

我在 linux 服务器上运行一个简单的 kafka-storm 拓扑。在我在本地 Windows 机器上运行之前,一切似乎都运行良好。但是,一旦我将代码移至生产环境并尝试启动风暴拓扑,我就会遇到以下问题:

java.lang.RuntimeException: java.nio.BufferUnderflowException
        at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:175) ~[storm-kafka-0.10.0.jar:0.10.0]
        at storm.kafka.PartitionManager.fill(PartitionManager.java:169) ~[storm-kafka-0.10.0.jar:0.10.0]
        at storm.kafka.PartitionManager.next(PartitionManager.java:131) ~[storm-kafka-0.10.0.jar:0.10.0]
        at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141) ~[storm-kafka-0.10.0.jar:0.10.0]
        at backtype.storm.daemon.executor$fn__5624$fn__5639$fn__5670.invoke(executor.clj:607) ~[storm-core-0.10.0.jar:0.10.0]
        at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) [storm-core-0.10.0.jar:0.10.0]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_95]
Caused by: java.nio.BufferUnderflowException
        at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) ~[?:1.7.0_95]
        at java.nio.ByteBuffer.get(ByteBuffer.java:694) ~[?:1.7.0_95]
        at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.api.TopicData$.readFrom(FetchResponse.scala:95) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168) ~[kafka_2.10-0.9.0.1.jar:?]
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
        at scala.collection.immutable.Range.foreach(Range.scala:141) ~[scala-library-2.10.5.jar:?]
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) ~[scala-library-2.10.5.jar:?]
        at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:168) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.10-0.9.0.1.jar:?]
        at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:165) ~[storm-kafka-0.10.0.jar:0.10.0]
        ... 7 more
10776 [Thread-18-temp] ERROR b.s.d.executor -
java.lang.RuntimeException: java.nio.BufferUnderflowException
        at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:175) ~[storm-kafka-0.10.0.jar:0.10.0]
        at storm.kafka.PartitionManager.fill(PartitionManager.java:169) ~[storm-kafka-0.10.0.jar:0.10.0]
        at storm.kafka.PartitionManager.next(PartitionManager.java:131) ~[storm-kafka-0.10.0.jar:0.10.0]
        at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141) ~[storm-kafka-0.10.0.jar:0.10.0]
        at backtype.storm.daemon.executor$fn__5624$fn__5639$fn__5670.invoke(executor.clj:607) ~[storm-core-0.10.0.jar:0.10.0]
        at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) [storm-core-0.10.0.jar:0.10.0]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_95]
Caused by: java.nio.BufferUnderflowException
        at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) ~[?:1.7.0_95]
        at java.nio.ByteBuffer.get(ByteBuffer.java:694) ~[?:1.7.0_95]
        at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.api.TopicData$.readFrom(FetchResponse.scala:95) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168) ~[kafka_2.10-0.9.0.1.jar:?]
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
        at scala.collection.immutable.Range.foreach(Range.scala:141) ~[scala-library-2.10.5.jar:?]
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) ~[scala-library-2.10.5.jar:?]
        at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:168) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.10-0.9.0.1.jar:?]
        at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:165) ~[storm-kafka-0.10.0.jar:0.10.0]
        ... 7 more
10811 [Thread-18-temp] ERROR b.s.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:336) [storm-core-0.10.0.jar:0.10.0]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.6.0.jar:?]
        at backtype.storm.daemon.worker$fn__7184$fn__7185.invoke(worker.clj:532) [storm-core-0.10.0.jar:0.10.0]
        at backtype.storm.daemon.executor$mk_executor_data$fn__5523$fn__5524.invoke(executor.clj:261) [storm-core-0.10.0.jar:0.10.0]
        at backtype.storm.util$async_loop$fn__545.invoke(util.clj:489) [storm-core-0.10.0.jar:0.10.0]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_95]

Any pointers will be great as Iam stuck on to this problem for a while now.
4

0 回答 0