8

Kinesis firehose 管理文件的持久性,在本例中为时间序列 JSON,到按 YYYY/MM/DD/HH 分区的文件夹层次结构中(以 24 编号为小时)......很棒。

那么如何使用 Spark 2.0 读取这些嵌套的子文件夹并从所有叶子 json 文件创建一个静态数据框?数据框阅读器是否有“选项”?

我的下一个目标是让它成为一个流式 DF,其中由 Firehose 持久化到 s3 中的新文件使用 Spark 2.0 中的新结构化流自然成为流式数据帧的一部分。我知道这都是实验性的——希望有人以前使用过 S3 作为流文件源,其中数据被划分到如上所述的文件夹中。当然更喜欢直接的 Kinesis 流,但是这个连接器上没有 2.0 的日期,所以 Firehose->S3 是临时的。

ND:我正在使用 databricks,它将 S3 挂载到 DBFS 中,但当然也很容易成为 EMR 或其他 Spark 提供程序。如果一个可以共享的笔记本也很高兴看到一个例子。

干杯!

4

2 回答 2

6

我可以读取嵌套的子文件夹并从所有叶子 JSON 文件创建一个静态 DataFrame 吗?DataFrame 阅读器有选项吗?

是的,由于您的目录结构是常规的(YYYY/MM/DD/HH),您可以使用通配符提供路径直到叶节点,如下所示

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate

val jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json")
// Here */*/*/*/*.json maps to YYYY/MM/DD/HH/filename.json 

当然,更喜欢直接使用 Kinesis 流,但此连接器上没有 2.0 的日期,因此 Firehose->S3 是临时的。

我可以看到有一个用于Kinesis 与 Spark Streaming 集成的库。因此,您可以直接读取流数据并对其执行 SQL 操作,而无需从 S3 中读取。

groupId = org.apache.spark
artifactId = spark-streaming-kinesis-asl_2.11
version = 2.0.0

使用 Spark Streaming 和 SQL 的示例代码

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val kinesisStream = KinesisUtils.createStream(
 streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
 [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)

kinesisStream.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val jsonDf = rdd.toDF() // or rdd.toDF("specify schema/columns here")

  // Create a temporary view with DataFrame
  jsonDf.createOrReplaceTempView("json_data_tbl")

  //As we have DataFrame and SparkSession object we can perform most 
  //of the Spark SQL stuff here
}
于 2016-12-16T15:12:33.590 回答
3

全面披露:我为 Databricks 工作,但我不代表他们在 Stack Overflow 上。

那么如何使用 Spark 2.0 读取这些嵌套的子文件夹并从所有叶子 json 文件创建一个静态数据框?数据框阅读器是否有“选项”?

DataFrameReader 支持加载序列。请参阅def json(paths: String*): DataFrame的文档。您可以指定序列、使用通配模式或以编程方式构建它(推荐):

val inputPathSeq = Seq[String]("/mnt/myles/structured-streaming/2016/12/18/02", "/mnt/myles/structured-streaming/2016/12/18/03")
val inputPathGlob = "/mnt/myles/structured-streaming/2016/12/18/*"
val basePath = "/mnt/myles/structured-streaming/2016/12/18/0"
val inputPathList = (2 to 4).toList.map(basePath+_+"/*.json")

我知道这都是实验性的——希望有人以前使用过 S3 作为流文件源,其中数据被分区到如上所述的文件夹中。当然更喜欢直接的 Kinesis 流,但是这个连接器上没有 2.0 的日期,所以 Firehose->S3 是临时的。

由于您使用的是 DBFS,我将假设从 Firehose 流式传输数据的 S3 存储桶已安装到 DBFS。如果您需要帮助将 S3 存储桶安装到 DBFS,请查看 Databricks 文档。获得上述输入路径后,您可以简单地将文件加载到静态或流式数据帧中:

静止的

val staticInputDF = 
  spark
    .read
    .schema(jsonSchema)
    .json(inputPathSeq : _*)

staticInputDF.isStreaming
res: Boolean = false

流媒体

val streamingInputDF = 
  spark
    .readStream                       // `readStream` instead of `read` for creating streaming DataFrame
    .schema(jsonSchema)               // Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  // Treat a sequence of files as a stream by picking one file at a time
    .json(inputPathSeq : _*)

streamingCountsDF.isStreaming
res: Boolean = true

其中大部分内容直接取自有关结构化流的 Databricks 文档。甚至还有一个可以直接导入 Databricks 的笔记本示例。

于 2016-12-18T22:16:49.680 回答