1

我在这里运行 Sentiment 示例以进行 tensorflow 转换。 https://github.com/tensorflow/transform/blob/master/examples/sentiment_example.py

对于第 78-98 行中定义的 fn ReadAndShuffleData(),是否有可能以类似的方式加载文件但从 HDFS,而不是 GCS?

我已经用几个梁 API(beams-2.8.0)尝试了一整天但失败了,我认为最有希望的是使用 beams.io.hadoopfilesystem。但是这个 fn 实际上会产生一个 python 文件对象,并且无法在梁管道中使用 beams.io.ReadFromText() 读取。

我还正确地传入了 HadoopFileSystemPipelineOptions。任何人都可以告诉我解决问题的方向或 2/3 行代码片段或解决方法?非常感谢!

ps hadoop 2.7.7,beams 2.8,数据加载正确。

我想我可能在这里缺乏一些理论理解,任何参考将不胜感激!

4

1 回答 1

2

您可以使用apache_beam.Create转换:

初始化签名:beam.Create(self, values, reshuffle=True)

Docstring:从可迭代对象创建 PCollection 的转换。

import apache_beam as beam
from apache_beam.options.pipeline_options import HadoopFileSystemOptions
from apache_beam.io.hadoopfilesystem import HadoopFileSystem

HDFS_HOSTNAME = 'foo.hadoop.com'
HDFS_PORT = 50070
hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME, hdfs_port=HDFS_PORT, hdfs_user="foobar")
hdfs_client = HadoopFileSystem(hdfs_client_options)

input_file_hdfs = "hdfs://foo/bar.csv"
f = hdfs_client.open(input_file_hdfs)

p = beam.Pipeline(options=PipelineOptions())
lines = p | 'ReadMyFile' >> beam.Create(f)
res = lines | "WriteMyFile" >> beam.io.WriteToText("./bar", ".csv")
p.run()
于 2019-05-28T09:56:51.157 回答