0

我尝试使用 Confluent 平台并使用此代码作为示例向 REST 端点发出高级 Kafka 请求。

我使用以下 Kafka 参数:

val kafkaParams = Map(
  "bootstrap.servers" -> "localhost:9092",
  "schema.registry.url" -> "http://localhost:8081",
  "group.id" -> "EventConsumer",
  "auto.offset.reset" -> "smallest"
)

这是我尝试运行代码时遇到的错误。错误发生在以下行:

@transient val kafkaStream: DStream[(String, Object)] =
  KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](
    ssc, kafkaParams, Set(topic)
  )

线程“主”org.apache.spark.SparkException 中的异常:java.nio.channels.ClosedChannelException org.apache.spark.SparkException:在 org.apache 上找不到 Set([test-topic,0]) 的领导者偏移量.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)在 scala.util.Either.fold(Either.scala:98) 在 org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) 在 org.apache.spark.streaming.kafka.KafkaUtils$ .getFromOffsets(KafkaUtils.scala:222) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at kafka.EventsConsumer$.delayedEndpoint$kafka$EventsConsumer$1(EventsConsumer.scala:53)在卡夫卡。EventsConsumer$delayedInit$body.apply(EventsConsumer.scala:22) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala: 12) 在 scala.App$$anonfun$main$1.apply(App.scala:76) 在 scala.App$$anonfun$main$1.apply(App.scala:76) 在 scala.collection.immutable.List.foreach (List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at kafka.EventsConsumer$.main(EventsConsumer .scala:22) 在 kafka.EventsConsumer.main(EventsConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl。com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 的 java.lang.reflect.Method.invoke(Method.java:498) 的调用(DelegatingMethodAccessorImpl.java:43)

更新:

我尝试更改localhost为IP,但仍然遇到同样的问题。

4

1 回答 1

0

看起来领导者不适用于主题分区。尝试描述主题并检查是否有任何领导者可用于 test-topic 的分区 0。如果分区的所有副本都关闭,则会发生这种情况。如果您的复制因子为 1,那么这是最可能的原因。

于 2016-09-06T02:38:23.457 回答