我遇到了带 Spark 的结构化流,它有一个从 S3 存储桶持续消费并将处理后的结果写入 MySQL 数据库的示例。
// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")
// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
.writeStream.format("jdbc")
.start("jdbc:mysql//...")
这如何与Spark Kafka Streaming一起使用?
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
有没有办法在不使用的情况下结合这两个例子stream.foreachRDD(rdd => {})
?