1

我正在尝试从使用 beam-sdks-java-io-hadoop-file-system v2.0.0 和 Spark 作为运行器的梁应用程序中的 AWS EMR 集群中读取 S3。我可以在纱线日志中看到管道能够检测到 S3 中存在的文件,但它无法读取该文件。请参阅下面的日志。

17/06/27 03:29:25 INFO FileBasedSource: Filepattern s3a://xxx/test-folder/* matched 1 files with total size 3410584
17/06/27 03:29:25 INFO FileBasedSource: Matched 1 files for pattern s3a://xxx/test-folder/*
17/06/27 03:29:25 INFO FileBasedSource: Splitting filepattern s3a://xxx/test-folder/* into bundles of size 1705292 took 82 ms and produced 1 files and 1 bundles
17/06/27 03:29:25 INFO SparkContext: Starting job: foreach at BoundedDataset.java:109

17/06/27 03:29:33 INFO BlockManagerInfo:在内存中添加了广播_0_piece0 ip-10-130-237-237.vpc.internal:40063(大小:4.6 KB,免费:3.5 GB)17/06/27 03 :29:36 WARN TaskSetManager:在 0.0 阶段丢失任务 0.0(TID 0,ip-10-130-237-237.vpc.internal):java.lang.RuntimeException:读取数据失败。在 org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.tryProduceNext(SourceRDD.java:198) 在 org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.hasNext(SourceRDD. java:239) 在 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 在 org.apache.spark.storage.MemoryStore 的 scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) .unrollSafely(MemoryStore.scala:284) 在 org.apache.spark.CacheManager。

当我使用输入文件系统运行相同的代码时HDFS,它可以完美运行。有人可以帮我弄清楚如何从 S3 读取数据吗?输入格式是 gzip 压缩的文本文件。

代码:

HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getHdfsConfiguration().get(0).get("fs.default.name")))
 .apply(ParDo.of(new PrintLineTransform()));

使用 S3 运行:

--hdfsConfiguration='[{"fs.default.name": "s3a://xxx/test-folder/*"}]

使用 HDFS 运行:

--hdfsConfiguration='[{"fs.default.name": "hdfs://xxx/test-folder/*"}]
4

1 回答 1

1

日志说Byte-buffer read unsupported by input stream

看起来梁需要在输入流中进行 API 扩展,而这实际上并未在 S3 客户端中实现,因此失败了。尽管最近添加了 JIRA HADOOP-14603,但该功能甚至在未发布的 Hadoop 版本中都没有。

没有人会急于实现该功能,因为 (a) 有更重要的事情要做,并且 (b) 这是一个小的性能优化(内存效率),实际上并没有被任何重要的东西使用。您也无法在本地文件系统或 Azure 中获得它。

修复:

  1. 通过测试实现 Hadoop 功能。我保证会为你复习。
  2. 说服别人为你做这件事。对于我们专业的 hadoop 开发人员来说,这通常发生在重要的人需要它时,无论是管理层还是客户。
  3. 让 Beam 处理不支持该功能的文件系统并抛出UnsupportedOperationException,回退到read(byte[]). 这就是他们所要做的。

解决方法

  • 以不同的压缩格式存储您的数据
  • 先复制到HDFS。

我建议您首先在 BEAM 下查找Apache JIRA上的错误/堆栈跟踪,如果不存在则提交新报告。看看他们怎么说。

更新:密切关注BEAM-2500

于 2017-06-29T19:28:43.003 回答