我正在尝试从使用 beam-sdks-java-io-hadoop-file-system v2.0.0 和 Spark 作为运行器的梁应用程序中的 AWS EMR 集群中读取 S3。我可以在纱线日志中看到管道能够检测到 S3 中存在的文件,但它无法读取该文件。请参阅下面的日志。
17/06/27 03:29:25 INFO FileBasedSource: Filepattern s3a://xxx/test-folder/* matched 1 files with total size 3410584
17/06/27 03:29:25 INFO FileBasedSource: Matched 1 files for pattern s3a://xxx/test-folder/*
17/06/27 03:29:25 INFO FileBasedSource: Splitting filepattern s3a://xxx/test-folder/* into bundles of size 1705292 took 82 ms and produced 1 files and 1 bundles
17/06/27 03:29:25 INFO SparkContext: Starting job: foreach at BoundedDataset.java:109
17/06/27 03:29:33 INFO BlockManagerInfo:在内存中添加了广播_0_piece0 ip-10-130-237-237.vpc.internal:40063(大小:4.6 KB,免费:3.5 GB)17/06/27 03 :29:36 WARN TaskSetManager:在 0.0 阶段丢失任务 0.0(TID 0,ip-10-130-237-237.vpc.internal):java.lang.RuntimeException:读取数据失败。在 org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.tryProduceNext(SourceRDD.java:198) 在 org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.hasNext(SourceRDD. java:239) 在 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 在 org.apache.spark.storage.MemoryStore 的 scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) .unrollSafely(MemoryStore.scala:284) 在 org.apache.spark.CacheManager。
当我使用输入文件系统运行相同的代码时HDFS
,它可以完美运行。有人可以帮我弄清楚如何从 S3 读取数据吗?输入格式是 gzip 压缩的文本文件。
代码:
HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getHdfsConfiguration().get(0).get("fs.default.name")))
.apply(ParDo.of(new PrintLineTransform()));
使用 S3 运行:
--hdfsConfiguration='[{"fs.default.name": "s3a://xxx/test-folder/*"}]
使用 HDFS 运行:
--hdfsConfiguration='[{"fs.default.name": "hdfs://xxx/test-folder/*"}]