1

在我的场景中,我将 CSV 文件连续上传到 HDFS。

上传新文件后,我想用 Spark SQL 处理新文件(例如,计算文件中字段的最大值,将文件转换为parquet)。即我在每个输入文件和转换/处理的输出文件之间有一个一对一的映射。

我正在评估 Spark Streaming 以监听 HDFS 目录,然后使用 Spark 处理“流文件”。

但是,为了处理整个文件,我需要知道“文件流”何时完成。我想将转换应用于整个文件,以保留文件之间的端到端一对一映射。

如何转换整个文件而不是它的微批次?

据我所知,Spark Streaming 只能将转换应用于批处理(DStreams映射到RDDs),而不是一次应用于整个文件(当其有限流完成时)。

那是对的吗?如果是这样,我应该为我的场景考虑什么替代方案?

4

2 回答 2

3

第一次尝试我可能误解了你的问题......

据我所知,Spark Streaming 只能将转换应用于批处理(映射到 RDD 的 DStream),而不能一次应用于整个文件(当其有限流完成时)。

那是对的吗?

不,这是正确的。

Spark Streaming 将立即对整个文件应用转换,就像在 Spark Streaming 的批处理间隔过去时写入 HDFS 一样。

Spark Streaming 将获取文件的当前内容并开始处理它。


上传新文件后,我需要使用 Spark/SparkSQL 处理新文件

Spark几乎不可能,因为它的架构从“上传”和 Spark 处理它的那一刻起需要一些时间。

您应该考虑使用全新且闪亮的结构化流式处理或(即将过时的)Spark Streaming

这两种解决方案都支持监视新文件的目录并在上传新文件后触发 Spark 作业(这正是您的用例)。

引用结构化流的输入源

在 Spark 2.0 中,有一些内置的源代码。

  • 文件源- 将写入目录中的文件作为数据流读取。支持的文件格式为文本、csv、json、parquet。请参阅 DataStreamReader 接口的文档以获取最新列表以及每种文件格式支持的选项。请注意,文件必须以原子方式放置在给定目录中,在大多数文件系统中,这可以通过文件移动操作来实现。

另请参阅 Spark Streaming 的基本来源

除了套接字之外,StreamingContext API 还提供了从文件作为输入源创建 DStream 的方法。

文件流:为了从与 HDFS API 兼容的任何文件系统(即 HDFS、S3、NFS 等)上的文件读取数据,可以将 DStream 创建为:

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming 将监视目录 dataDirectory 并处理在该目录中创建的任何文件(不支持写入嵌套目录中的文件)。

考虑到您的要求,有一个警告:

我需要知道“文件流”何时完成。

不要对 Spark 执行此操作。

再次引用 Spark Streaming 的Basic Sources

  • 必须通过将文件原子地移动或重命名到数据目录中来在 dataDirectory 中创建文件。

  • 移动后,不得更改文件。因此,如果文件被连续追加,则不会读取新数据。

总结...您应该在文件完成并准备好使用 Spark 处理时将文件移动到 Spark 监视的目录。这超出了 Spark 的范围。

于 2017-06-06T10:53:05.767 回答
0

您可以使用 DFSInotifyEventInputStream 观察 Hadoop 目录,然后在创建文件时以编程方式执行 Spark 作业。

看到这篇文章: HDFS 文件观察器

于 2017-06-06T03:54:10.443 回答