2

我正在使用带有 Python SDK 的 Google Cloud Dataflow。

我想 :

  • 从主 PCollection 中获取唯一日期列表
  • 遍历该列表中的日期以创建过滤后的 PCollection(每个都有唯一的日期),并将每个过滤后的 PCollection 写入 BigQuery 中时间分区表中的分区。

我怎样才能得到那个清单?在以下组合转换之后,我创建了一个 ListPCollectionView 对象,但我无法迭代该对象:

class ToUniqueList(beam.CombineFn):

    def create_accumulator(self):
        return []

    def add_input(self, accumulator, element):
        if element not in accumulator:
            accumulator.append(element)
        return accumulator

    def merge_accumulators(self, accumulators):
        return list(set(accumulators))

    def extract_output(self, accumulator):
        return accumulator


def get_list_of_dates(pcoll):

    return (pcoll
            | 'get the list of dates' >> beam.CombineGlobally(ToUniqueList()))

我做错了吗?最好的方法是什么?

谢谢。

4

1 回答 1

6

无法PCollection直接获取 a 的内容 - Apache Beam 或 Dataflow 管道更像是一个查询计划,应该进行哪些处理,是PCollection计划中的逻辑中间节点,而不是包含数据。主程序组装计划(管道)并启动它。

但是,最终您会尝试将数据写入按日期分片的 BigQuery 表。此用例当前仅在 Java SDK 中受支持,并且仅适用于流式管道。

对于根据数据将数据写入多个目标的更一般处理,请遵循BEAM-92

另请参阅通过 Google Cloud Dataflow 创建/写入 Parititoned BigQuery 表

于 2017-01-03T20:56:07.157 回答