0

我正在尝试Spark Streaming + Kafka 集成指南(Kafka 代理版本 0.10.0 或更高版本)中的示例代码。代码可以正常运行,但我收不到任何记录。如果我运行 kafka-console-consumer.sh --from-beginning,我可以获得记录。有谁知道原因?我的代码如下:

val broker = "221.181.73.44:19092"
val topics = Array("connect-test")
val groupid = "SparkStreamingLoad3"
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> broker,
  "group.id" -> groupid,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "auto.offset.reset" -> "earliest", //earliest | latest
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

stream.print()

ssc.start()
ssc.awaitTermination()

我的 SBT 版本是:

version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming-kafka-0-10_2.10" % "2.1.0",
  "org.apache.spark" % "spark-core_2.10" % "2.1.0",
"org.apache.spark" % "spark-streaming_2.10" % "2.1.0",
"org.apache.kafka" % "kafka_2.10" % "0.10.2.1"
)

谢谢!

4

2 回答 2

1

最后,我解决了这个问题。这是答案:

  1. 主题中的数据是从控制台生产者生成的,它是一个字符串列表。但是,数据的格式是[Array[Byte], Array[Byte]]。不是[字符串,字符串]。因此,如果我使用 StringDeserializer,将不会收到任何数据。

  2. 我从控制台消费者源代码中了解到 writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit

RDD 中的键/值可以包含空值。就我而言,所有键都是空的。我使用以下代码获取数据:

stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte]](ssc, PreferConsistent, Subscribe[Array[Byte], Array[Byte]](topics, kafkaParams)) stream.map(rdd=>new String(Option (rdd.key()).getOrElse("null".getBytes))+ "|||demiter|||"+new String(Option(rdd.value()).getOrElse("null".getBytes))) 。打印()

于 2017-06-27T10:15:33.307 回答
0
val broker = "221.181.73.44:19092"

默认端口是9092,这可能是问题所在。

"auto.offset.reset" -> "earliest"并且"enable.auto.commit" -> false应该始终从主题日志的开头开始阅读,因为您的偏移量不会存储在任何地方。所以这没有问题。

另外,我们可以看到您使用的完整命令kafka-console-consumer.sh 吗?

于 2017-06-22T16:32:12.447 回答