3

我正在尝试从多个路径将一些 avro 文件读取到 DataFrame 中。假设我的路径是"s3a://bucket_name/path/to/file/year=18/month=11/day=01" 在这条路径下我还有两个分区让我们说country=XX/region=XX

我想一次读取多个日期而不明确命名国家和地区分区。此外,我希望国家和地区成为此 DataFrame 中的列。

sqlContext.read.format("com.databricks.spark.avro").load("s3a://bucket_name/path/to/file/year=18/month=11/day=01")

这条线运行良好,因为我只阅读了一条路径。它检测国家和地区分区并推断其架构。

当我尝试阅读多个日期时,让我们说

val paths = Seq("s3a://bucket_name/path/to/file/year=18/month=11/day=01", "s3a://bucket_name/path/to/file/year=18/month=11/day=02")

sqlContext.read.format("com.databricks.spark.avro").load(paths:_*)

我收到此错误:

    18/12/03 03:13:53 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result insub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
18/12/03 03:13:53 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths:?
 s3a://bucket_name/path/to/file/year=18/month=11/day=02
s3a://bucket_name/path/to/file/year=18/month=11/day=01
    
If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.
        at scala.Predef$.assert(Predef.scala:179)
        at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:106)
        at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$discoverPartitions(interfaces.scala:621)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionSpec$3.apply(interfaces.scala:526)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionSpec$3.apply(interfaces.scala:525)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.sources.HadoopFsRelation.partitionSpec(interfaces.scala:524)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionColumns$1.apply(interfaces.scala:578)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionColumns$1.apply(interfaces.scala:578)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.sources.HadoopFsRelation.partitionColumns(interfaces.scala:578)
        at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:637)
        at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
        at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:39)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:136)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
        at $iwC$$iwC$$iwC.<init>(<console>:38)
        at $iwC$$iwC.<init>(<console>:40)
        at $iwC.<init>(<console>:42)
        at <init>(<console>:44)
        at .<init>(<console>:48)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)
        at org.apache.spark.repl.Main$.main(Main.scala:35)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

显然我不能使用 basePath 因为路径不共享一个。我还尝试在每条路径的末尾使用 /*,这确实有效,但完全忽略了国家和地区分区。

我可以一一阅读路径并将其合并,但我觉得我错过了一些东西。

知道为什么它仅适用于单个路径以及如何使其适用于多个路径吗?

4

2 回答 2

2

真希望所有的错误信息都一样清楚——If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.

相对路径year=18/month=11/day=01是由于分区造成的,还是您只是使用了相同的约定?

如果前者是正确的,那么您应该只阅读s3a://bucket_name/path/to/file/,并使用谓词来过滤所需的日期。或者也许正如错误所建议的那样,您可以尝试sqlContext.read.option("basePath","s3a://bucket_name/path/to/file/").format("com.databricks.spark.avro").load(paths:_*),其中路径是相对的

如果后者为真,那么您应该分别查询每个并应用于unionAll数据帧(如错误消息所示)。在这种情况下,也许将年/月/日视为分区列也可以,即使您在写入数据时没有使用 partitionBy ...

于 2018-12-03T09:00:47.423 回答
2

老问题,但这就是我最终在类似情况下所做的事情

spark.read.parquet(paths:_*)
  .withColumn("year", regexp_extract(input_file_name, "year=(.+?)/", 1))
  .withColumn("month", regexp_extract(input_file_name, "month=(.+?)/", 1))
  .withColumn("day", regexp_extract(input_file_name, "day=(.+?)/", 1))

当你有一个静态分区结构时工作。谁来挑战将其扩展为动态(即解析出“x=y/z=c”形式的任意分区结构并将其转换为列)?

于 2020-09-18T02:30:06.687 回答