我正在使用 Python SDK 在 Apache Beam 中编写一个程序,以从 Pub/Sub 读取 JSON 文件的内容,并对接收到的字符串进行一些处理。这是程序中我从 Pub/Sub 中提取内容并进行处理的部分:
with beam.Pipeline(options=PipelineOptions()) as pipeline:
lines = pipeline | beam.io.gcp.pubsub.ReadStringsFromPubSub(subscription=known_args.subscription)
lines_decoded = lines | beam.Map(lambda x: x.decode("base64"))
lines_split = lines_decoded | (beam.FlatMap(lambda x: x.split('\n')))
def json_to_tuple(jsonStr):
res = json.loads(jsonStr)
##printing retutn value
print (res['id'], res['messageSize'])
##
return (res['id'], res['messageSize'])
tupled = lines_split | beam.Map(json_to_tuple)
def printlines(line):
print line
result = tupled | beam.CombinePerKey(sum)
result | beam.Map(printlines)
运行程序时,代码在创建 PCollection 后卡住tupled
(此后没有执行任何代码行)。奇怪的是,当我将源从 Pub/Sub 更改为包含完全相同内容的本地文件(使用ReadFromText()
)时,程序运行良好。这种行为的原因可能是什么?