我按照这个文件,它运作良好。现在我尝试使用 spark 中的连接器数据。有什么可以参考的吗?由于我使用的是 confluent,它与原始 kafka 参考文档有很大不同。
这是我到目前为止使用的一些代码。问题是无法将记录数据转换为 java.String。(并且不确定这是正确的消费方式)
val brokers = "http://127.0.0.1:9092"
val topics = List("postgres-accounts2")
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
//sparkConf.setMaster("spark://sda1:7077,sda2:7077")
sparkConf.setMaster("local[2]")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.generic.GenericData$Record]))
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
// Create direct kafka stream with brokers and topics
//val topicsSet = topics.split(",")
val kafkaParams = Map[String, Object](
"schema.registry.url" -> "http://127.0.0.1:8081",
"bootstrap.servers" -> "http://127.0.0.1:9092",
"key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val data = messages.map(record => {
println( record)
println( "value : " + record.value().toString() ) // error java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.String
//println( Json.parse( record.value() + ""))
(record.key, record.value)
})