0

我正在使用 Python SDK 在 Apache Beam 中编写一个程序,以从 Pub/Sub 读取 JSON 文件的内容,并对接收到的字符串进行一些处理。这是程序中我从 Pub/Sub 中提取内容并进行处理的部分:

with beam.Pipeline(options=PipelineOptions()) as pipeline:
    lines = pipeline | beam.io.gcp.pubsub.ReadStringsFromPubSub(subscription=known_args.subscription)
    lines_decoded = lines | beam.Map(lambda x: x.decode("base64"))

    lines_split = lines_decoded | (beam.FlatMap(lambda x: x.split('\n')))

    def json_to_tuple(jsonStr):
        res = json.loads(jsonStr)
        ##printing retutn value
        print (res['id'], res['messageSize'])
        ##
        return (res['id'], res['messageSize'])

    tupled = lines_split | beam.Map(json_to_tuple)

    def printlines(line):
        print line

    result = tupled | beam.CombinePerKey(sum)
    result | beam.Map(printlines)

运行程序时,代码在创建 PCollection 后卡住tupled(此后没有执行任何代码行)。奇怪的是,当我将源从 Pub/Sub 更改为包含完全相同内容的本地文件(使用ReadFromText())时,程序运行良好。这种行为的原因可能是什么?

4

1 回答 1

1

根据 Pub/Sub I/O 文档(Apache Beam 文档Dataflow Pub/Sub I/O 文档),默认情况下,PubsubIO 转换与无界 PCollection 一起使用。

PCollections 可以是有界的或无界的:

  • 有界:数据来自固定来源,如文件。
  • 无界:数据来自不断更新的来源,例如 Pub/Sub 订阅。

在对无界 PCollection 进行操作之前,您必须使用以下策略之一:

  • Windowing: unbounded PCollections不能直接用在分组变换上(比如CombinePerKey你正在使用的),所以你应该先设置一个非全局的windowing function
  • 触发器:您可以为无界 PCollection设置触发器,使其定期更新无界数据集,即使订阅中的数据仍在流动。

这可以解释您所看到的行为,即当它从本地文件(这是一个有界数据源)读取时,相同的管道工作,但在从 Pub/Sub 订阅(这是一个无界数据源)读取时不工作。

因此,为了使用 Pub/Sub 订阅,您应该应用窗口或触发策略,以便 PCollection 中的数据可以在以下转换中正确处理。

编辑:另外,正如@Arjun 所发现的,可能需要通过使用以下命令设置适当的 arg 参数来启用流水线中的流式处理:

pipeline_options.view_as(StandardOptions).streaming = True
于 2018-03-24T14:09:07.523 回答