5

我开始学习 Spark,并且很难理解 Spark 中结构化流背后的合理性。结构化流将所有到达的数据视为无界输入表,其中数据流中的每个新项目都被视为表中的新行。我有以下代码可以将传入的文件读入csvFolder.

val spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

val csvSchema = new StructType().add("street", "string").add("city", "string")
.add("zip", "string").add("state", "string").add("beds", "string")
.add("baths", "string").add("sq__ft", "string").add("type", "string")
.add("sale_date", "string").add("price", "string").add("latitude", "string")
.add("longitude", "string")

val streamingDF = spark.readStream.schema(csvSchema).csv("./csvFolder/")

val query = streamingDF.writeStream
  .format("console")
  .start()

如果我将一个 1GB 的文件转储到该文件夹​​会发生什么。根据规范,流式作业每隔几毫秒触发一次。如果 Spark 在下一瞬间遇到这么大的文件,在尝试加载文件时会不会内存不足。还是自动批处理?如果是,这个批处理参数是否可配置?

4

1 回答 1

7

例子

关键思想是将任何数据流视为无界表:添加到流中的新记录就像附加到表中的行。 在此处输入图像描述 这使我们可以将批处理和流数据都视为表。由于表和 DataFrames/Datasets 在语义上是同义词,因此类似批处理的 DataFrame/Dataset 查询可以应用于批处理数据和流数据。

在结构化流模型中,这是执行此查询的方式。 在此处输入图像描述

问题:如果Spark在下一瞬间遇到这么大的文件,在尝试加载文件时会不会内存不足。还是自动批处理?如果是,这个批处理参数是否可配置?

回答: OOM 没有意义,因为它是 RDD(DF/DS) 延迟初始化的。当然,您需要在处理之前重新分区,以确保相同数量的分区和数据均匀地分布在执行程序中......

于 2017-05-21T07:37:56.260 回答