我想阅读 kafka 主题,然后通过 spark 流将其写入 kudu 表。
我的第一种方法
// sessions and contexts
val conf = new SparkConf().setMaster("local[2]").setAppName("TestMain")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
val sparkContext = sparkSession.sparkContext
val kuduContext = new KuduContext("...", sparkContext);
// structure
val schema: StructType = StructType(
StructField("userNo", IntegerType, true) ::
StructField("bandNo", IntegerType, false) ::
StructField("ipv4", StringType, false) :: Nil);
// kudu - prepare table
kuduContext.deleteTable("test_table");
kuduContext.createTable("test_table", schema, Seq("userNo"), new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(List("userNo").asJava, 3))
// get stream from kafka
val parsed = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("startingOffsets", "latest")
.option("subscribe", "feed_api_band_get_popular_post_list")
.load()
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
// write it to kudu
kuduContext.insertRows(parsed.toDF(), "test_table");
现在它抱怨
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
我的第二种方法
看来我将代码更改为使用传统的 KafkaUtils.createDirectStream
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
).foreachRDD(rdd => {
rdd.foreach(record => {
// write to kudu.............
println(record.value());
})
});
ssc.start();
ssc.awaitTermination();
那么,哪种方法是正确的呢?或者有什么办法让它从第一种方法运行?
Spark 版本为 2.2.0。