0

所以,我刚刚开始使用 Apache Beam。我计划在 GCP 中运行 DataFlow 作业,我最初使用 DataPrep 运行它们,但我很快就超出了它的功能。警告,我已经用 Python 2/3 编程 2 年了,所以我想我已经从新手变成了业余爱好者,只是为了你的意识。所以这是我的问题,我在 IDE 中成功编写了一些 AB 代码(2.6 版)。但我无法得到任何实际工作。也就是说,即使在将 csv 文件读入 PCollection 之后,我也看不到它起作用了。也就是说,它只是说“PCollection Object at 0xf3a6 ...”

所以我在狂热的谷歌搜索时看到了另一个人的帖子,他们说你应该使用“with”语句,以便 Python 自动打开和关闭管道?所以,一旦我这样做了,我至少能够将我刚刚读入的输出写入文件以查看发生了什么。所以,首先,我发现我之前编写的 SAME 代码在我将其放入 with 语句之前没有做任何事情真的很奇怪......这是怎么回事?我是否需要在 with 语句中为管道做所有事情?而其他的定义只是普通的 Python 东西?这是代码:

def run(self, argv=None):

    #p = beam.Pipeline()
    with beam.Pipeline(options=PipelineOptions()) as p:
        left_side = p | 'Read_Left_Side' >> beam.io.ReadFromText('/me/left_side_table.csv')
        left_side | 'Write' >> beam.io.WriteToText('/me/', file_name_suffix='purple_nurple.csv')
        right_side = p | 'Read_Right_Side' >> beam.io.ReadFromText('/me/right_side_table.csv')
    # left_side = p | 'Read_Left_Side' >> beam.io.ReadFromText('gs://path/to/left_side.csv')
    # right_side = p | 'Read_Right_Side' >> beam.io.ReadFromText('gs://path/to/right_side.csv')

    hello=[1,2,3,4,5,6]|beam.Map(lambda x: 3**x)

    left_side = p | 'Read' >> beam.io.ReadFromText('/me/left_side_table.csv')
    left_side | 'Write' >> beam.io.WriteToText('/me/', file_name_suffix='purple_nurple.csv')
    print(left_side)
    right_side = p | 'Read' >> beam.io.ReadFromText('/me//right_side_table.csv')
    howdy= left_side|beam.Map(lambda x: x/2)
    pass
4

1 回答 1

0

您需要调用 piepleine.run() 来执行管道。Beam pipeline 也遵循这里提到的资源习惯用法https://docs.python.org/2.7/reference/compound_stmts.html#the-with-statement 所以当你使用时with pipeline,你不需要调用 pipeline.run()。您可以在代码中使用任何一种方法。回答您的问题

所以,首先,我发现我之前编写的 SAME 代码在我将其放入 with 语句之前没有做任何事情真的很奇怪......这是怎么回事?

Beam Pipeline 在此处遵循成语https://docs.python.org/2.7/reference/compound_stmts.html#the-with-statement

我是否需要在 with 语句中为管道做所有事情?

如果您使用资源成语,那么是的。但是,如果您自己调用 pipeline.run() ,则没有 with 语句。在您的代码中,您使用的是“with”,因此“with”之后的管道修改不会应用于作业。

而其他的定义只是普通的 Python 东西?

哪个定义?

于 2018-08-26T03:15:11.420 回答