3

我正在尝试从 Spark Structured Streaming 中的 S3 存储桶读取数据。下面的代码用于获取现有数据。但是,当新数据添加到存储桶时,Spark 不会选择这个。

val lines = spark.readStream.schema(schemaImp).format("com.databricks.spark.avro").load("s3n://bucket/*")
val query = lines.writeStream.outputMode("append").format("memory").queryName("memtable").start()
query.processAllAvailable()
spark.sql("select * from memtable").show()

我怎样才能使这项工作来获取新数据?或者,这是一个还不支持的功能吗?

4

1 回答 1

1

首先针对本地 FS 进行测试。如果它在那里工作但不适用于 S3,那么它与 s3 重命名/提交有些怪癖。如果它不适用于本地 FS,那么这就是您使用流媒体的方式。也许尝试一个测试,它会在每次调用 .map() 时记录下来,这样您就可以计算实际使用情况。

如果您正在使用流式传输和对象存储,(a) 使用 s3a 而不是 s3n,并且 (b) 直接保存到对象存储路径中,而不是保存 + 重命名 - 您只需要避免处理文件系统上的不完整数据在写入时可见

于 2016-12-10T17:55:23.697 回答