我可以读取嵌套的子文件夹并从所有叶子 JSON 文件创建一个静态 DataFrame 吗?DataFrame 阅读器有选项吗?
是的,由于您的目录结构是常规的(YYYY/MM/DD/HH
),您可以使用通配符提供路径直到叶节点,如下所示
val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json")
// Here */*/*/*/*.json maps to YYYY/MM/DD/HH/filename.json
当然,更喜欢直接使用 Kinesis 流,但此连接器上没有 2.0 的日期,因此 Firehose->S3 是临时的。
我可以看到有一个用于Kinesis 与 Spark Streaming 集成的库。因此,您可以直接读取流数据并对其执行 SQL 操作,而无需从 S3 中读取。
groupId = org.apache.spark
artifactId = spark-streaming-kinesis-asl_2.11
version = 2.0.0
使用 Spark Streaming 和 SQL 的示例代码
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
val kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
kinesisStream.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val jsonDf = rdd.toDF() // or rdd.toDF("specify schema/columns here")
// Create a temporary view with DataFrame
jsonDf.createOrReplaceTempView("json_data_tbl")
//As we have DataFrame and SparkSession object we can perform most
//of the Spark SQL stuff here
}