所以,我刚刚开始使用 Apache Beam。我计划在 GCP 中运行 DataFlow 作业,我最初使用 DataPrep 运行它们,但我很快就超出了它的功能。警告,我已经用 Python 2/3 编程 2 年了,所以我想我已经从新手变成了业余爱好者,只是为了你的意识。所以这是我的问题,我在 IDE 中成功编写了一些 AB 代码(2.6 版)。但我无法得到任何实际工作。也就是说,即使在将 csv 文件读入 PCollection 之后,我也看不到它起作用了。也就是说,它只是说“PCollection Object at 0xf3a6 ...”
所以我在狂热的谷歌搜索时看到了另一个人的帖子,他们说你应该使用“with”语句,以便 Python 自动打开和关闭管道?所以,一旦我这样做了,我至少能够将我刚刚读入的输出写入文件以查看发生了什么。所以,首先,我发现我之前编写的 SAME 代码在我将其放入 with 语句之前没有做任何事情真的很奇怪......这是怎么回事?我是否需要在 with 语句中为管道做所有事情?而其他的定义只是普通的 Python 东西?这是代码:
def run(self, argv=None):
#p = beam.Pipeline()
with beam.Pipeline(options=PipelineOptions()) as p:
left_side = p | 'Read_Left_Side' >> beam.io.ReadFromText('/me/left_side_table.csv')
left_side | 'Write' >> beam.io.WriteToText('/me/', file_name_suffix='purple_nurple.csv')
right_side = p | 'Read_Right_Side' >> beam.io.ReadFromText('/me/right_side_table.csv')
# left_side = p | 'Read_Left_Side' >> beam.io.ReadFromText('gs://path/to/left_side.csv')
# right_side = p | 'Read_Right_Side' >> beam.io.ReadFromText('gs://path/to/right_side.csv')
hello=[1,2,3,4,5,6]|beam.Map(lambda x: 3**x)
left_side = p | 'Read' >> beam.io.ReadFromText('/me/left_side_table.csv')
left_side | 'Write' >> beam.io.WriteToText('/me/', file_name_suffix='purple_nurple.csv')
print(left_side)
right_side = p | 'Read' >> beam.io.ReadFromText('/me//right_side_table.csv')
howdy= left_side|beam.Map(lambda x: x/2)
pass