8

来自 spark 结构化流文档:“此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以DataStreamWriter在启动查询时设置为选项。”

果然,将检查点设置为 s3 路径会引发:

17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) 
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) 
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) 
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301) 
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430) 
        at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) 
        at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) 
        at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) 
        at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) 
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) 
        at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133) 
        at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala) 
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
        at java.lang.reflect.Method.invoke(Method.java:498) 
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637) 
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook 

这里有几个问题:

  1. 为什么不支持 s3 作为检查点目录(常规 spark 流支持此)?是什么让文件系统“符合 HDFS”?
  2. 我临时使用 HDFS(因为集群可以随时启动或关闭)并使用 s3 作为保存所有数据的地方 - 在这种设置中存储结构化流数据的检查点数据的建议是什么?
4

5 回答 5

10

是什么让 FS HDFS “兼容”?它是一个文件系统,具有Hadoop FS 规范中指定的行为。对象存储和 FS 之间的区别在此处进行了介绍,关键是“最终一致的对象存储没有附加或 O(1) 原子重命名不兼容”

特别是对于 S3

  1. 它并不一致:在创建新 blob 后,列表命令通常不会显示它。删除也一样。
  2. 当 blob 被覆盖或删除时,可能需要一段时间才能消失
  3. rename() 是通过复制然后删除来实现的

通过将所有内容保存到一个位置然后将其重命名为检查点目录来激发流检查点。这使得检查点的时间与在 S3 中复制数据的时间成正比,约为 6-10 MB/s。

当前的流代码不适合 s3

现在,做一个

  • 检查点到 HDFS,然后复制结果
  • 检查点到分配并附加到您的集群的一些 EBS
  • 检查点到 S3,但检查点之间的间隔很长,因此检查点的时间不会使您的流式应用程序停机。

如果您使用的是 EMR,则可以为一致的、由 dynamo DB 支持的 S3 支付额外费用,这将为您提供更好的一致性。但是复制时间还是一样的,所以检查点也一样慢

于 2017-02-03T09:56:07.293 回答
7

这是一个已知问题:https ://issues.apache.org/jira/browse/SPARK-19407

应该在下一个版本中修复。--conf spark.hadoop.fs.defaultFS=s3作为一种解决方法,您可以将默认文件系统设置为 s3 。

于 2017-02-02T21:19:43.530 回答
4

此问题已在https://issues.apache.org/jira/browse/SPARK-19407中修复。

然而,结构化流检查点在 S3 中不能很好地工作,因为 S3 中缺乏最终的一致性。使用 S3 检查点https://issues.apache.org/jira/browse/SPARK-19013不是一个好主意。

Micheal Armburst 表示这不会在 Spark 中修复,解决方案是等待 S3guard 实施。S3Guard 有时会消失。

编辑:自发表这篇文章以来的 2 项发展 a) 对 S3Guard 的支持已合并到 Spark 3.0 中。b) AWS 使 S3 立即保持一致。

于 2017-07-12T14:50:08.927 回答
1

是的,如果您使用的是 Spark Structured Streaming 版本 3 或更高版本。首先,创建一个SparkSession并将 S3 配置添加到其上下文中。

val sparkSession = SparkSession
    .builder()
    .master(sparkMasterUrl)
    .appName(appName)
    .getOrCreate()

sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "accessKey")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "secretKey")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://s3URL:s3Port")
sparkSession.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

稍后,checkpointLocation在开始查询之前使用 S3 存储桶添加配置。例如:

val streamingQuery = streamingDF.writeStream
    .option("checkpointLocation", "s3a://bucketName/checkpointDir/")
    .foreachBatch{(batchDF: DataFrame, batchId: Long) =>
       // Transform and write batchDF
     }.start()

streamingQuery.awaitTermination()
于 2021-03-24T16:56:02.100 回答
0

您可以使用 s3 作为检查点,但您应该启用 EMRFS,以便处理 s3 一致性。

于 2020-09-02T09:01:51.650 回答